This is an automated email from the ASF dual-hosted git repository.
tanxinyu pushed a commit to branch rel/1.2
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/rel/1.2 by this push:
new ae8548282a6 [To rel/1.2] Add database and region tag in disk file
metrics (#11282)
ae8548282a6 is described below
commit ae8548282a6b53825375029c406bb0ea05ebf034
Author: ZhangHongYin <[email protected]>
AuthorDate: Wed Oct 11 20:46:06 2023 -0500
[To rel/1.2] Add database and region tag in disk file metrics (#11282)
---
.../iotdb/db/service/metrics/FileMetrics.java | 540 ++-------------------
.../metrics/file/CompactionFileMetrics.java | 165 +++++++
.../db/service/metrics/file/ModsFileMetrics.java | 85 ++++
.../metrics/file/SystemRelatedFileMetrics.java | 109 +++++
.../db/service/metrics/file/TsFileMetrics.java | 414 ++++++++++++++++
.../db/service/metrics/file/WalFileMetrics.java | 58 +++
.../db/storageengine/dataregion/DataRegion.java | 54 ++-
.../execute/task/CrossSpaceCompactionTask.java | 25 +-
.../execute/task/InnerSpaceCompactionTask.java | 10 +-
.../utils/CompactionFileGeneratorUtils.java | 23 +-
10 files changed, 932 insertions(+), 551 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/metrics/FileMetrics.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/metrics/FileMetrics.java
index c57a3a8526e..7884f33bf84 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/metrics/FileMetrics.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/metrics/FileMetrics.java
@@ -19,544 +19,88 @@
package org.apache.iotdb.db.service.metrics;
-import org.apache.iotdb.commons.service.metric.enums.Metric;
-import org.apache.iotdb.commons.service.metric.enums.Tag;
-import
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.task.AbstractCompactionTask;
-import
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.task.CompactionTaskSummary;
-import
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.task.InnerSpaceCompactionTask;
-import
org.apache.iotdb.db.storageengine.dataregion.compaction.schedule.CompactionTaskManager;
-import
org.apache.iotdb.db.storageengine.dataregion.tsfile.generator.TsFileNameGenerator;
-import org.apache.iotdb.db.storageengine.dataregion.wal.WALManager;
+import org.apache.iotdb.db.service.metrics.file.CompactionFileMetrics;
+import org.apache.iotdb.db.service.metrics.file.ModsFileMetrics;
+import org.apache.iotdb.db.service.metrics.file.SystemRelatedFileMetrics;
+import org.apache.iotdb.db.service.metrics.file.TsFileMetrics;
+import org.apache.iotdb.db.service.metrics.file.WalFileMetrics;
+import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
import org.apache.iotdb.metrics.AbstractMetricService;
-import org.apache.iotdb.metrics.MetricConstant;
-import org.apache.iotdb.metrics.config.MetricConfig;
-import org.apache.iotdb.metrics.config.MetricConfigDescriptor;
import org.apache.iotdb.metrics.metricsets.IMetricSet;
-import org.apache.iotdb.metrics.type.Gauge;
-import org.apache.iotdb.metrics.utils.MetricLevel;
-import org.apache.iotdb.metrics.utils.MetricType;
-import org.apache.iotdb.metrics.utils.SystemType;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.BufferedReader;
-import java.io.File;
-import java.io.IOException;
-import java.io.InputStreamReader;
import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
@SuppressWarnings("java:S6548") // do not warn about singleton class
public class FileMetrics implements IMetricSet {
- private static final Logger log = LoggerFactory.getLogger(FileMetrics.class);
- private static final MetricConfig METRIC_CONFIG =
- MetricConfigDescriptor.getInstance().getMetricConfig();
- private static final WALManager WAL_MANAGER = WALManager.getInstance();
- private final Runtime runtime = Runtime.getRuntime();
- private String[] getOpenFileNumberCommand;
-
- private AbstractMetricService metricService = null;
- private static final String FILE_LEVEL_COUNT = "file_level_count";
- private static final String FILE_LEVEL_SIZE = "file_level_size";
- private static final String SEQUENCE = "sequence";
- private static final String UNSEQUENCE = "unsequence";
- private static final String LEVEL = "level";
- private final AtomicLong seqFileSize = new AtomicLong(0);
- private final AtomicLong unseqFileSize = new AtomicLong(0);
- private final AtomicInteger seqFileNum = new AtomicInteger(0);
- private final AtomicInteger unseqFileNum = new AtomicInteger(0);
-
- private final AtomicInteger modFileNum = new AtomicInteger(0);
-
- private final AtomicLong modFileSize = new AtomicLong(0);
- private final Map<Integer, Integer> seqLevelTsFileCountMap = new
ConcurrentHashMap<>();
- private final Map<Integer, Integer> unseqLevelTsFileCountMap = new
ConcurrentHashMap<>();
- private final Map<Integer, Long> seqLevelTsFileSizeMap = new
ConcurrentHashMap<>();
- private final Map<Integer, Long> unseqLevelTsFileSizeMap = new
ConcurrentHashMap<>();
- private long lastUpdateTime = 0;
-
- // compaction temporal files
- private final AtomicLong innerSeqCompactionTempFileSize = new AtomicLong(0);
- private final AtomicLong innerUnseqCompactionTempFileSize = new
AtomicLong(0);
- private final AtomicLong crossCompactionTempFileSize = new AtomicLong(0);
- private final AtomicInteger innerSeqCompactionTempFileNum = new
AtomicInteger(0);
- private final AtomicInteger innerUnseqCompactionTempFileNum = new
AtomicInteger(0);
- private final AtomicInteger crossCompactionTempFileNum = new
AtomicInteger(0);
- private AtomicBoolean hasRemainData = new AtomicBoolean(false);
- private final Map<Integer, Gauge> seqLevelCountGaugeMap = new
ConcurrentHashMap<>();
- private final Map<Integer, Gauge> seqLevelSizeGaugeMap = new
ConcurrentHashMap<>();
- private final Map<Integer, Gauge> unseqLevelCountGaugeMap = new
ConcurrentHashMap<>();
- private final Map<Integer, Gauge> unseqLevelSizeGaugeMap = new
ConcurrentHashMap<>();
-
- @SuppressWarnings("squid:S1075")
- private String fileHandlerCntPathInLinux = "/proc/%s/fd";
-
- private FileMetrics() {
- fileHandlerCntPathInLinux = String.format(fileHandlerCntPathInLinux,
METRIC_CONFIG.getPid());
- }
-
- public static FileMetrics getInstance() {
- return FileMetricsInstanceHolder.INSTANCE;
- }
+ private static final TsFileMetrics TS_FILE_METRICS = new TsFileMetrics();
+ private static final ModsFileMetrics MODS_FILE_METRICS = new
ModsFileMetrics();
+ private static final CompactionFileMetrics COMPACTION_FILE_METRICS = new
CompactionFileMetrics();
+ private static final WalFileMetrics WAL_FILE_METRICS = new WalFileMetrics();
+ private static final SystemRelatedFileMetrics SYSTEM_RELATED_FILE_METRICS =
+ new SystemRelatedFileMetrics();
@Override
public void bindTo(AbstractMetricService metricService) {
- this.metricService = metricService;
- bindTsFileMetrics(metricService);
- bindWalFileMetrics(metricService);
- bindCompactionFileMetrics(metricService);
- bindSystemRelatedMetrics(metricService);
- }
-
- private void bindTsFileMetrics(AbstractMetricService metricService) {
- metricService.createAutoGauge(
- Metric.FILE_SIZE.toString(),
- MetricLevel.CORE,
- this,
- o -> o.getFileSize(true),
- Tag.NAME.toString(),
- "seq");
- metricService.createAutoGauge(
- Metric.FILE_SIZE.toString(),
- MetricLevel.CORE,
- this,
- o -> o.getFileSize(false),
- Tag.NAME.toString(),
- "unseq");
- metricService.createAutoGauge(
- Metric.FILE_SIZE.toString(),
- MetricLevel.CORE,
- this,
- FileMetrics::getModFileSize,
- Tag.NAME.toString(),
- "mods");
- metricService.createAutoGauge(
- Metric.FILE_COUNT.toString(),
- MetricLevel.CORE,
- this,
- o -> o.getFileNum(true),
- Tag.NAME.toString(),
- "seq");
- metricService.createAutoGauge(
- Metric.FILE_COUNT.toString(),
- MetricLevel.CORE,
- this,
- o -> o.getFileNum(false),
- Tag.NAME.toString(),
- "unseq");
- metricService.createAutoGauge(
- Metric.FILE_COUNT.toString(),
- MetricLevel.CORE,
- this,
- FileMetrics::getModFileNum,
- Tag.NAME.toString(),
- "mods");
- checkIfThereRemainingData();
- }
-
- private void bindWalFileMetrics(AbstractMetricService metricService) {
- metricService.createAutoGauge(
- Metric.FILE_SIZE.toString(),
- MetricLevel.CORE,
- WAL_MANAGER,
- WALManager::getTotalDiskUsage,
- Tag.NAME.toString(),
- "wal");
- metricService.createAutoGauge(
- Metric.FILE_COUNT.toString(),
- MetricLevel.CORE,
- WAL_MANAGER,
- WALManager::getTotalFileNum,
- Tag.NAME.toString(),
- "wal");
- }
-
- private void bindCompactionFileMetrics(AbstractMetricService metricService) {
- metricService.createAutoGauge(
- Metric.FILE_SIZE.toString(),
- MetricLevel.CORE,
- this,
- o -> o.getInnerCompactionTempFileSize(true),
- Tag.NAME.toString(),
- "inner-seq-temp");
- metricService.createAutoGauge(
- Metric.FILE_SIZE.toString(),
- MetricLevel.CORE,
- this,
- o -> o.getInnerCompactionTempFileSize(false),
- Tag.NAME.toString(),
- "inner-unseq-temp");
- metricService.createAutoGauge(
- Metric.FILE_SIZE.toString(),
- MetricLevel.CORE,
- this,
- FileMetrics::getCrossCompactionTempFileSize,
- Tag.NAME.toString(),
- "cross-temp");
- metricService.createAutoGauge(
- Metric.FILE_COUNT.toString(),
- MetricLevel.CORE,
- this,
- o -> o.getInnerCompactionTempFileNum(true),
- Tag.NAME.toString(),
- "inner-seq-temp");
- metricService.createAutoGauge(
- Metric.FILE_COUNT.toString(),
- MetricLevel.CORE,
- this,
- o -> o.getInnerCompactionTempFileNum(false),
- Tag.NAME.toString(),
- "inner-unseq-temp");
- metricService.createAutoGauge(
- Metric.FILE_COUNT.toString(),
- MetricLevel.CORE,
- this,
- FileMetrics::getCrossCompactionTempFileNum,
- Tag.NAME.toString(),
- "cross-temp");
- }
-
- private void bindSystemRelatedMetrics(AbstractMetricService metricService) {
- if ((METRIC_CONFIG.getSystemType() == SystemType.LINUX
- || METRIC_CONFIG.getSystemType() == SystemType.MAC)
- && METRIC_CONFIG.getPid().length() != 0) {
- this.getOpenFileNumberCommand =
- new String[] {
- "/bin/sh", "-c", String.format("lsof -p %s | wc -l",
METRIC_CONFIG.getPid())
- };
- metricService.createAutoGauge(
- Metric.FILE_COUNT.toString(),
- MetricLevel.IMPORTANT,
- this,
- FileMetrics::getOpenFileHandlersNumber,
- Tag.NAME.toString(),
- "open_file_handlers");
- }
+ TS_FILE_METRICS.bindTo(metricService);
+ MODS_FILE_METRICS.bindTo(metricService);
+ COMPACTION_FILE_METRICS.bindTo(metricService);
+ WAL_FILE_METRICS.bindTo(metricService);
+ SYSTEM_RELATED_FILE_METRICS.bindTo(metricService);
}
@Override
public void unbindFrom(AbstractMetricService metricService) {
- unbindTsFileMetrics(metricService);
- unbindWalMetrics(metricService);
- unbindCompactionMetrics(metricService);
- unbindSystemRelatedMetrics(metricService);
- }
-
- private void unbindTsFileMetrics(AbstractMetricService metricService) {
- metricService.remove(
- MetricType.AUTO_GAUGE, Metric.FILE_SIZE.toString(),
Tag.NAME.toString(), "seq");
- metricService.remove(
- MetricType.AUTO_GAUGE, Metric.FILE_SIZE.toString(),
Tag.NAME.toString(), "unseq");
- metricService.remove(
- MetricType.AUTO_GAUGE, Metric.FILE_SIZE.toString(),
Tag.NAME.toString(), "mods");
- metricService.remove(
- MetricType.AUTO_GAUGE, Metric.FILE_COUNT.toString(),
Tag.NAME.toString(), "seq");
- metricService.remove(
- MetricType.AUTO_GAUGE, Metric.FILE_COUNT.toString(),
Tag.NAME.toString(), "unseq");
- metricService.remove(
- MetricType.AUTO_GAUGE, Metric.FILE_COUNT.toString(),
Tag.NAME.toString(), "mods");
- }
-
- private void unbindWalMetrics(AbstractMetricService metricService) {
- metricService.remove(
- MetricType.AUTO_GAUGE, Metric.FILE_SIZE.toString(),
Tag.NAME.toString(), "wal");
- metricService.remove(
- MetricType.AUTO_GAUGE, Metric.FILE_COUNT.toString(),
Tag.NAME.toString(), "wal");
- }
-
- private void unbindCompactionMetrics(AbstractMetricService metricService) {
- metricService.remove(
- MetricType.AUTO_GAUGE, Metric.FILE_COUNT.toString(),
Tag.NAME.toString(), "inner-seq-temp");
- metricService.remove(
- MetricType.AUTO_GAUGE,
- Metric.FILE_COUNT.toString(),
- Tag.NAME.toString(),
- "inner-unseq-temp");
- metricService.remove(
- MetricType.AUTO_GAUGE, Metric.FILE_COUNT.toString(),
Tag.NAME.toString(), "cross-temp");
- metricService.remove(
- MetricType.AUTO_GAUGE, Metric.FILE_SIZE.toString(),
Tag.NAME.toString(), "inner-seq-temp");
- metricService.remove(
- MetricType.AUTO_GAUGE,
- Metric.FILE_SIZE.toString(),
- Tag.NAME.toString(),
- "inner-unseq-temp");
- metricService.remove(
- MetricType.AUTO_GAUGE, Metric.FILE_SIZE.toString(),
Tag.NAME.toString(), "cross-temp");
- }
-
- private void unbindSystemRelatedMetrics(AbstractMetricService metricService)
{
- if ((METRIC_CONFIG.getSystemType() == SystemType.LINUX
- || METRIC_CONFIG.getSystemType() == SystemType.MAC)
- && METRIC_CONFIG.getPid().length() != 0) {
- metricService.remove(
- MetricType.AUTO_GAUGE,
- Metric.FILE_COUNT.toString(),
- Tag.NAME.toString(),
- "open_file_handlers");
- }
- }
-
- private long getOpenFileHandlersNumber() {
- long fdCount = 0;
- try {
- if (METRIC_CONFIG.getSystemType() == SystemType.LINUX) {
- // count the fd in the system directory instead of
- // calling runtime.exec() which could be much slower
- File fdDir = new File(fileHandlerCntPathInLinux);
- if (fdDir.exists()) {
- File[] fds = fdDir.listFiles();
- fdCount = fds == null ? 0 : fds.length;
- }
- } else if ((METRIC_CONFIG.getSystemType() == SystemType.MAC)
- && METRIC_CONFIG.getPid().length() != 0) {
- Process process = runtime.exec(getOpenFileNumberCommand);
- StringBuilder result = new StringBuilder();
- try (BufferedReader input =
- new BufferedReader(new
InputStreamReader(process.getInputStream()))) {
- String line;
- while ((line = input.readLine()) != null) {
- result.append(line);
- }
- }
- fdCount = Long.parseLong(result.toString().trim());
- }
- } catch (IOException e) {
- log.warn("Failed to get open file number, because ", e);
- }
- return fdCount;
- }
-
- // following are update functions for tsfile metrics
- public void addFile(long size, boolean seq, String name) {
- updateGlobalCountAndSize(size, 1, seq);
- try {
- TsFileNameGenerator.TsFileName tsFileName =
TsFileNameGenerator.getTsFileName(name);
- int level = tsFileName.getInnerCompactionCnt();
- updateLevelCountAndSize(size, 1, seq, level);
- } catch (IOException e) {
- log.warn("Unexpected error occurred when getting tsfile name", e);
- }
- }
-
- private void updateGlobalCountAndSize(long sizeDelta, int countDelta,
boolean seq) {
- if (seq) {
- seqFileSize.getAndAdd(sizeDelta);
- seqFileNum.getAndAdd(countDelta);
- } else {
- unseqFileSize.getAndAdd(sizeDelta);
- unseqFileNum.getAndAdd(countDelta);
- }
+ TS_FILE_METRICS.unbindFrom(metricService);
+ MODS_FILE_METRICS.unbindFrom(metricService);
+ COMPACTION_FILE_METRICS.unbindFrom(metricService);
+ WAL_FILE_METRICS.unbindFrom(metricService);
+ SYSTEM_RELATED_FILE_METRICS.unbindFrom(metricService);
}
- private void updateLevelCountAndSize(long sizeDelta, int countDelta, boolean
seq, int level) {
- int count = 0;
- long totalSize = 0;
- if (seq) {
- count =
- seqLevelTsFileCountMap.compute(level, (k, v) -> v == null ?
countDelta : v + countDelta);
- totalSize =
- seqLevelTsFileSizeMap.compute(level, (k, v) -> v == null ? sizeDelta
: v + sizeDelta);
- } else {
- count =
- unseqLevelTsFileCountMap.compute(
- level, (k, v) -> v == null ? countDelta : v + countDelta);
- totalSize =
- unseqLevelTsFileSizeMap.compute(level, (k, v) -> v == null ?
sizeDelta : v + sizeDelta);
- }
- updateLevelFileInfoInMetricService(totalSize, count, seq, level);
- }
-
- private void updateLevelFileInfoInMetricService(
- long totalSize, int count, boolean seq, int level) {
- if (metricService != null) {
- updateCountGauge(
- level,
- count,
- seq ? seqLevelCountGaugeMap : unseqLevelCountGaugeMap,
- seq ? SEQUENCE : UNSEQUENCE);
- updateSizeGauge(
- level,
- totalSize,
- seq ? seqLevelSizeGaugeMap : unseqLevelSizeGaugeMap,
- seq ? SEQUENCE : UNSEQUENCE);
- checkIfThereRemainingData();
- } else {
- // the metric service has not been set yet
- hasRemainData.set(true);
- }
- }
-
- private void checkIfThereRemainingData() {
- if (hasRemainData.get()) {
- synchronized (this) {
- if (hasRemainData.get()) {
- hasRemainData.set(false);
- updateRemainData();
- }
- }
- }
- }
-
- private void updateCountGauge(
- int level, int count, Map<Integer, Gauge> countGaugeMap, String
orderStr) {
- countGaugeMap
- .computeIfAbsent(
- level,
- l ->
- metricService.getOrCreateGauge(
- FILE_LEVEL_COUNT,
- MetricLevel.CORE,
- Tag.TYPE.toString(),
- orderStr,
- LEVEL,
- String.valueOf(level)))
- .set(count);
- }
-
- private void updateSizeGauge(
- int level, long size, Map<Integer, Gauge> sizeGaugeMap, String orderStr)
{
- sizeGaugeMap
- .computeIfAbsent(
- level,
- l ->
- metricService.getOrCreateGauge(
- FILE_LEVEL_SIZE,
- MetricLevel.CORE,
- Tag.TYPE.toString(),
- orderStr,
- LEVEL,
- String.valueOf(level)))
- .set(size);
+ // region TsFile Related Metrics Update
+ public void addTsFile(String database, String regionId, long size, boolean
seq, String name) {
+ TS_FILE_METRICS.addTsFile(database, regionId, size, seq, name);
}
- public void deleteFile(long[] sizeList, boolean seq, List<String> names) {
- long totalSize = 0;
- for (long size : sizeList) {
- totalSize += size;
- }
- updateGlobalCountAndSize(-totalSize, -sizeList.length, seq);
- for (int i = 0, length = names.size(); i < length; ++i) {
- int level = -1;
- String name = names.get(i);
- long size = sizeList[i];
- try {
- TsFileNameGenerator.TsFileName tsFileName =
TsFileNameGenerator.getTsFileName(name);
- level = tsFileName.getInnerCompactionCnt();
- updateLevelCountAndSize(-size, -1, seq, level);
- } catch (IOException e) {
- log.warn("Unexpected error occurred when getting tsfile name", e);
- }
- }
+ public void deleteTsFile(boolean seq, List<TsFileResource>
tsFileResourceList) {
+ TS_FILE_METRICS.deleteFile(seq, tsFileResourceList);
}
- private void updateRemainData() {
- for (Map.Entry<Integer, Integer> entry :
seqLevelTsFileCountMap.entrySet()) {
- updateCountGauge(entry.getKey(), entry.getValue(),
seqLevelCountGaugeMap, SEQUENCE);
- }
- for (Map.Entry<Integer, Long> entry : seqLevelTsFileSizeMap.entrySet()) {
- updateSizeGauge(entry.getKey(), entry.getValue(), seqLevelSizeGaugeMap,
SEQUENCE);
- }
- for (Map.Entry<Integer, Integer> entry :
unseqLevelTsFileCountMap.entrySet()) {
- updateCountGauge(entry.getKey(), entry.getValue(),
unseqLevelCountGaugeMap, UNSEQUENCE);
- }
- for (Map.Entry<Integer, Long> entry : unseqLevelTsFileSizeMap.entrySet()) {
- updateSizeGauge(entry.getKey(), entry.getValue(),
unseqLevelSizeGaugeMap, UNSEQUENCE);
- }
+ public void deleteRegion(String database, String regionId) {
+ TS_FILE_METRICS.deleteRegion(database, regionId);
}
- public long getFileSize(boolean seq) {
- return seq ? seqFileSize.get() : unseqFileSize.get();
- }
+ // endregion
- public long getFileNum(boolean seq) {
- return seq ? seqFileNum.get() : unseqFileNum.get();
- }
-
- public int getModFileNum() {
- return modFileNum.get();
- }
-
- public long getModFileSize() {
- return modFileSize.get();
- }
+ // region Mods TsFile Related Metrics Update
public void increaseModFileNum(int num) {
- modFileNum.addAndGet(num);
+ MODS_FILE_METRICS.increaseModFileNum(num);
}
public void decreaseModFileNum(int num) {
- modFileNum.addAndGet(-num);
+ MODS_FILE_METRICS.decreaseModFileNum(num);
}
public void increaseModFileSize(long size) {
- modFileSize.addAndGet(size);
+ MODS_FILE_METRICS.increaseModFileSize(size);
}
public void decreaseModFileSize(long size) {
- modFileSize.addAndGet(-size);
- }
-
- public long getInnerCompactionTempFileSize(boolean seq) {
- updateCompactionTempSize();
- return seq ? innerSeqCompactionTempFileSize.get() :
innerUnseqCompactionTempFileSize.get();
+ MODS_FILE_METRICS.decreaseModFileSize(size);
}
- private synchronized void updateCompactionTempSize() {
- if (System.currentTimeMillis() - lastUpdateTime <=
MetricConstant.UPDATE_INTERVAL) {
- return;
- }
- lastUpdateTime = System.currentTimeMillis();
+ // endregion
- innerSeqCompactionTempFileSize.set(0);
- innerSeqCompactionTempFileNum.set(0);
- innerUnseqCompactionTempFileSize.set(0);
- innerUnseqCompactionTempFileNum.set(0);
- crossCompactionTempFileSize.set(0);
- crossCompactionTempFileNum.set(0);
+ private static class FileMetricsInstanceHolder {
+ private static final FileMetrics INSTANCE = new FileMetrics();
- List<AbstractCompactionTask> runningTasks =
- CompactionTaskManager.getInstance().getRunningCompactionTaskList();
- for (AbstractCompactionTask task : runningTasks) {
- CompactionTaskSummary summary = task.getSummary();
- if (task instanceof InnerSpaceCompactionTask) {
- if (task.isInnerSeqTask()) {
-
innerSeqCompactionTempFileSize.addAndGet(summary.getTemporalFileSize());
- innerSeqCompactionTempFileNum.addAndGet(1);
- } else {
-
innerUnseqCompactionTempFileSize.addAndGet(summary.getTemporalFileSize());
- innerUnseqCompactionTempFileNum.addAndGet(1);
- }
- } else {
- crossCompactionTempFileSize.addAndGet(summary.getTemporalFileSize());
- crossCompactionTempFileNum.addAndGet(summary.getTemporalFileNum());
- }
+ private FileMetricsInstanceHolder() {
+ // do nothing constructor
}
}
- public long getCrossCompactionTempFileSize() {
- updateCompactionTempSize();
- return crossCompactionTempFileSize.get();
- }
-
- public long getInnerCompactionTempFileNum(boolean seq) {
- updateCompactionTempSize();
- return seq ? innerSeqCompactionTempFileNum.get() :
innerUnseqCompactionTempFileNum.get();
- }
-
- public long getCrossCompactionTempFileNum() {
- updateCompactionTempSize();
- return crossCompactionTempFileNum.get();
- }
-
- private static class FileMetricsInstanceHolder {
- private static final FileMetrics INSTANCE = new FileMetrics();
-
- private FileMetricsInstanceHolder() {}
+ public static FileMetrics getInstance() {
+ return FileMetricsInstanceHolder.INSTANCE;
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/metrics/file/CompactionFileMetrics.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/metrics/file/CompactionFileMetrics.java
new file mode 100644
index 00000000000..ed90a7b5093
--- /dev/null
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/metrics/file/CompactionFileMetrics.java
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.service.metrics.file;
+
+import org.apache.iotdb.commons.service.metric.enums.Metric;
+import org.apache.iotdb.commons.service.metric.enums.Tag;
+import
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.task.AbstractCompactionTask;
+import
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.task.CompactionTaskSummary;
+import
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.task.InnerSpaceCompactionTask;
+import
org.apache.iotdb.db.storageengine.dataregion.compaction.schedule.CompactionTaskManager;
+import org.apache.iotdb.metrics.AbstractMetricService;
+import org.apache.iotdb.metrics.MetricConstant;
+import org.apache.iotdb.metrics.metricsets.IMetricSet;
+import org.apache.iotdb.metrics.utils.MetricLevel;
+import org.apache.iotdb.metrics.utils.MetricType;
+
+import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+
+public class CompactionFileMetrics implements IMetricSet {
+ private static final String INNER_SEQ_TEMP = "inner-seq-temp";
+ private static final String INNER_UNSEQ_TEMP = "inner-unseq-temp";
+ private static final String CROSS_TEMP = "cross-temp";
+ // compaction temporal files
+ private final AtomicLong innerSeqCompactionTempFileSize = new AtomicLong(0);
+ private final AtomicLong innerUnseqCompactionTempFileSize = new
AtomicLong(0);
+ private final AtomicLong crossCompactionTempFileSize = new AtomicLong(0);
+ private final AtomicInteger innerSeqCompactionTempFileNum = new
AtomicInteger(0);
+ private final AtomicInteger innerUnseqCompactionTempFileNum = new
AtomicInteger(0);
+ private final AtomicInteger crossCompactionTempFileNum = new
AtomicInteger(0);
+
+ private long lastUpdateTime = 0;
+
+ @Override
+ public void bindTo(AbstractMetricService metricService) {
+ metricService.createAutoGauge(
+ Metric.FILE_SIZE.toString(),
+ MetricLevel.CORE,
+ this,
+ o -> o.getInnerCompactionTempFileSize(true),
+ Tag.NAME.toString(),
+ INNER_SEQ_TEMP);
+ metricService.createAutoGauge(
+ Metric.FILE_SIZE.toString(),
+ MetricLevel.CORE,
+ this,
+ o -> o.getInnerCompactionTempFileSize(false),
+ Tag.NAME.toString(),
+ INNER_UNSEQ_TEMP);
+ metricService.createAutoGauge(
+ Metric.FILE_SIZE.toString(),
+ MetricLevel.CORE,
+ this,
+ CompactionFileMetrics::getCrossCompactionTempFileSize,
+ Tag.NAME.toString(),
+ CROSS_TEMP);
+ metricService.createAutoGauge(
+ Metric.FILE_COUNT.toString(),
+ MetricLevel.CORE,
+ this,
+ o -> o.getInnerCompactionTempFileNum(true),
+ Tag.NAME.toString(),
+ INNER_SEQ_TEMP);
+ metricService.createAutoGauge(
+ Metric.FILE_COUNT.toString(),
+ MetricLevel.CORE,
+ this,
+ o -> o.getInnerCompactionTempFileNum(false),
+ Tag.NAME.toString(),
+ INNER_UNSEQ_TEMP);
+ metricService.createAutoGauge(
+ Metric.FILE_COUNT.toString(),
+ MetricLevel.CORE,
+ this,
+ CompactionFileMetrics::getCrossCompactionTempFileNum,
+ Tag.NAME.toString(),
+ CROSS_TEMP);
+ }
+
+ @Override
+ public void unbindFrom(AbstractMetricService metricService) {
+ metricService.remove(
+ MetricType.AUTO_GAUGE, Metric.FILE_COUNT.toString(),
Tag.NAME.toString(), INNER_SEQ_TEMP);
+ metricService.remove(
+ MetricType.AUTO_GAUGE, Metric.FILE_COUNT.toString(),
Tag.NAME.toString(), INNER_UNSEQ_TEMP);
+ metricService.remove(
+ MetricType.AUTO_GAUGE, Metric.FILE_COUNT.toString(),
Tag.NAME.toString(), CROSS_TEMP);
+ metricService.remove(
+ MetricType.AUTO_GAUGE, Metric.FILE_SIZE.toString(),
Tag.NAME.toString(), INNER_SEQ_TEMP);
+ metricService.remove(
+ MetricType.AUTO_GAUGE, Metric.FILE_SIZE.toString(),
Tag.NAME.toString(), INNER_UNSEQ_TEMP);
+ metricService.remove(
+ MetricType.AUTO_GAUGE, Metric.FILE_SIZE.toString(),
Tag.NAME.toString(), CROSS_TEMP);
+ }
+
+ public long getInnerCompactionTempFileSize(boolean seq) {
+ updateCompactionTempSize();
+ return seq ? innerSeqCompactionTempFileSize.get() :
innerUnseqCompactionTempFileSize.get();
+ }
+
+ private synchronized void updateCompactionTempSize() {
+ if (System.currentTimeMillis() - lastUpdateTime <=
MetricConstant.UPDATE_INTERVAL) {
+ return;
+ }
+ lastUpdateTime = System.currentTimeMillis();
+
+ innerSeqCompactionTempFileSize.set(0);
+ innerSeqCompactionTempFileNum.set(0);
+ innerUnseqCompactionTempFileSize.set(0);
+ innerUnseqCompactionTempFileNum.set(0);
+ crossCompactionTempFileSize.set(0);
+ crossCompactionTempFileNum.set(0);
+
+ List<AbstractCompactionTask> runningTasks =
+ CompactionTaskManager.getInstance().getRunningCompactionTaskList();
+ for (AbstractCompactionTask task : runningTasks) {
+ CompactionTaskSummary summary = task.getSummary();
+ if (task instanceof InnerSpaceCompactionTask) {
+ if (task.isInnerSeqTask()) {
+
innerSeqCompactionTempFileSize.addAndGet(summary.getTemporalFileSize());
+ innerSeqCompactionTempFileNum.addAndGet(1);
+ } else {
+
innerUnseqCompactionTempFileSize.addAndGet(summary.getTemporalFileSize());
+ innerUnseqCompactionTempFileNum.addAndGet(1);
+ }
+ } else {
+ crossCompactionTempFileSize.addAndGet(summary.getTemporalFileSize());
+ crossCompactionTempFileNum.addAndGet(summary.getTemporalFileNum());
+ }
+ }
+ }
+
+ public long getCrossCompactionTempFileSize() {
+ updateCompactionTempSize();
+ return crossCompactionTempFileSize.get();
+ }
+
+ public long getInnerCompactionTempFileNum(boolean seq) {
+ updateCompactionTempSize();
+ return seq ? innerSeqCompactionTempFileNum.get() :
innerUnseqCompactionTempFileNum.get();
+ }
+
+ public long getCrossCompactionTempFileNum() {
+ updateCompactionTempSize();
+ return crossCompactionTempFileNum.get();
+ }
+}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/metrics/file/ModsFileMetrics.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/metrics/file/ModsFileMetrics.java
new file mode 100644
index 00000000000..6e181c3a33f
--- /dev/null
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/metrics/file/ModsFileMetrics.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.service.metrics.file;
+
+import org.apache.iotdb.commons.service.metric.enums.Metric;
+import org.apache.iotdb.commons.service.metric.enums.Tag;
+import org.apache.iotdb.metrics.AbstractMetricService;
+import org.apache.iotdb.metrics.metricsets.IMetricSet;
+import org.apache.iotdb.metrics.utils.MetricLevel;
+import org.apache.iotdb.metrics.utils.MetricType;
+
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+
+public class ModsFileMetrics implements IMetricSet {
+ private final AtomicInteger modFileNum = new AtomicInteger(0);
+ private final AtomicLong modFileSize = new AtomicLong(0);
+
+ @Override
+ public void bindTo(AbstractMetricService metricService) {
+ metricService.createAutoGauge(
+ Metric.FILE_SIZE.toString(),
+ MetricLevel.CORE,
+ this,
+ ModsFileMetrics::getModFileSize,
+ Tag.NAME.toString(),
+ "mods");
+ metricService.createAutoGauge(
+ Metric.FILE_COUNT.toString(),
+ MetricLevel.CORE,
+ this,
+ ModsFileMetrics::getModFileNum,
+ Tag.NAME.toString(),
+ "mods");
+ }
+
+ @Override
+ public void unbindFrom(AbstractMetricService metricService) {
+ metricService.remove(
+ MetricType.AUTO_GAUGE, Metric.FILE_SIZE.toString(),
Tag.NAME.toString(), "mods");
+ metricService.remove(
+ MetricType.AUTO_GAUGE, Metric.FILE_COUNT.toString(),
Tag.NAME.toString(), "mods");
+ }
+
+ private int getModFileNum() {
+ return modFileNum.get();
+ }
+
+ private long getModFileSize() {
+ return modFileSize.get();
+ }
+
+ public void increaseModFileNum(int num) {
+ modFileNum.addAndGet(num);
+ }
+
+ public void decreaseModFileNum(int num) {
+ modFileNum.addAndGet(-num);
+ }
+
+ public void increaseModFileSize(long size) {
+ modFileSize.addAndGet(size);
+ }
+
+ public void decreaseModFileSize(long size) {
+ modFileSize.addAndGet(-size);
+ }
+}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/metrics/file/SystemRelatedFileMetrics.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/metrics/file/SystemRelatedFileMetrics.java
new file mode 100644
index 00000000000..3fac0d10a16
--- /dev/null
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/metrics/file/SystemRelatedFileMetrics.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.service.metrics.file;
+
+import org.apache.iotdb.commons.service.metric.enums.Metric;
+import org.apache.iotdb.commons.service.metric.enums.Tag;
+import org.apache.iotdb.metrics.AbstractMetricService;
+import org.apache.iotdb.metrics.config.MetricConfig;
+import org.apache.iotdb.metrics.config.MetricConfigDescriptor;
+import org.apache.iotdb.metrics.metricsets.IMetricSet;
+import org.apache.iotdb.metrics.utils.MetricLevel;
+import org.apache.iotdb.metrics.utils.MetricType;
+import org.apache.iotdb.metrics.utils.SystemType;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStreamReader;
+
+public class SystemRelatedFileMetrics implements IMetricSet {
+ private static final Logger LOGGER =
LoggerFactory.getLogger(SystemRelatedFileMetrics.class);
+ private static final MetricConfig CONFIG =
MetricConfigDescriptor.getInstance().getMetricConfig();
+ private final Runtime runtime = Runtime.getRuntime();
+ private String[] getOpenFileNumberCommand;
+
+ @SuppressWarnings("squid:S1075")
+ private String fileHandlerCntPathInLinux = "/proc/%s/fd";
+
+ public SystemRelatedFileMetrics() {
+ fileHandlerCntPathInLinux = String.format(fileHandlerCntPathInLinux,
CONFIG.getPid());
+ }
+
+ @Override
+ public void bindTo(AbstractMetricService metricService) {
+ if ((CONFIG.getSystemType() == SystemType.LINUX || CONFIG.getSystemType()
== SystemType.MAC)
+ && !CONFIG.getPid().isEmpty()) {
+ this.getOpenFileNumberCommand =
+ new String[] {"/bin/sh", "-c", String.format("lsof -p %s | wc -l",
CONFIG.getPid())};
+ metricService.createAutoGauge(
+ Metric.FILE_COUNT.toString(),
+ MetricLevel.IMPORTANT,
+ this,
+ SystemRelatedFileMetrics::getOpenFileHandlersNumber,
+ Tag.NAME.toString(),
+ "open_file_handlers");
+ }
+ }
+
+ private long getOpenFileHandlersNumber() {
+ long fdCount = 0;
+ try {
+ if (CONFIG.getSystemType() == SystemType.LINUX) {
+ // count the fd in the system directory instead of
+ // calling runtime.exec() which could be much slower
+ File fdDir = new File(fileHandlerCntPathInLinux);
+ if (fdDir.exists()) {
+ File[] fds = fdDir.listFiles();
+ fdCount = fds == null ? 0 : fds.length;
+ }
+ } else if ((CONFIG.getSystemType() == SystemType.MAC) &&
!CONFIG.getPid().isEmpty()) {
+ Process process = runtime.exec(getOpenFileNumberCommand);
+ StringBuilder result = new StringBuilder();
+ try (BufferedReader input =
+ new BufferedReader(new
InputStreamReader(process.getInputStream()))) {
+ String line;
+ while ((line = input.readLine()) != null) {
+ result.append(line);
+ }
+ }
+ fdCount = Long.parseLong(result.toString().trim());
+ }
+ } catch (IOException e) {
+ LOGGER.warn("Failed to get open file number, because ", e);
+ }
+ return fdCount;
+ }
+
+ @Override
+ public void unbindFrom(AbstractMetricService metricService) {
+ if ((CONFIG.getSystemType() == SystemType.LINUX || CONFIG.getSystemType()
== SystemType.MAC)
+ && !CONFIG.getPid().isEmpty()) {
+ metricService.remove(
+ MetricType.AUTO_GAUGE,
+ Metric.FILE_COUNT.toString(),
+ Tag.NAME.toString(),
+ "open_file_handlers");
+ }
+ }
+}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/metrics/file/TsFileMetrics.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/metrics/file/TsFileMetrics.java
new file mode 100644
index 00000000000..032a0316c11
--- /dev/null
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/metrics/file/TsFileMetrics.java
@@ -0,0 +1,414 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.service.metrics.file;
+
+import org.apache.iotdb.commons.service.metric.enums.Tag;
+import org.apache.iotdb.commons.utils.TestOnly;
+import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
+import
org.apache.iotdb.db.storageengine.dataregion.tsfile.generator.TsFileNameGenerator;
+import org.apache.iotdb.metrics.AbstractMetricService;
+import org.apache.iotdb.metrics.metricsets.IMetricSet;
+import org.apache.iotdb.metrics.type.Gauge;
+import org.apache.iotdb.metrics.utils.MetricLevel;
+import org.apache.iotdb.tsfile.utils.Pair;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+
+public class TsFileMetrics implements IMetricSet {
+ private static final Logger LOGGER =
LoggerFactory.getLogger(TsFileMetrics.class);
+ private final AtomicReference<AbstractMetricService> metricService = new
AtomicReference<>();
+ private final AtomicBoolean hasRemainData = new AtomicBoolean(false);
+ private static final String SEQUENCE = "sequence";
+ private static final String UNSEQUENCE = "unsequence";
+ private static final String LEVEL = "level";
+ private static final String FILE_GLOBAL_COUNT = "file_global_count";
+ private static final String FILE_GLOBAL_SIZE = "file_global_size";
+ private static final String FILE_LEVEL_COUNT = "file_level_count";
+ private static final String FILE_LEVEL_SIZE = "file_level_size";
+
+ // database -> regionId -> sequence file size
+ private final Map<String, Map<String, Pair<Long, Gauge>>> seqFileSizeMap =
+ new ConcurrentHashMap<>();
+ // database -> regionId -> unsequence file size
+ private final Map<String, Map<String, Pair<Long, Gauge>>> unseqFileSizeMap =
+ new ConcurrentHashMap<>();
+ // database -> regionId -> sequence file count
+ private final Map<String, Map<String, Pair<Integer, Gauge>>> seqFileCountMap
=
+ new ConcurrentHashMap<>();
+ // database -> regionId -> unsequence file count
+ private final Map<String, Map<String, Pair<Integer, Gauge>>>
unseqFileCountMap =
+ new ConcurrentHashMap<>();
+
+ // level -> sequence file count
+ private final Map<Integer, Pair<Integer, Gauge>> seqLevelTsFileCountMap =
+ new ConcurrentHashMap<>();
+ // level -> unsequence file count
+ private final Map<Integer, Pair<Integer, Gauge>> unseqLevelTsFileCountMap =
+ new ConcurrentHashMap<>();
+ // level -> sequence file size
+ private final Map<Integer, Pair<Long, Gauge>> seqLevelTsFileSizeMap = new
ConcurrentHashMap<>();
+ // level -> unsequence file size
+ private final Map<Integer, Pair<Long, Gauge>> unseqLevelTsFileSizeMap = new
ConcurrentHashMap<>();
+
+ @Override
+ public void bindTo(AbstractMetricService metricService) {
+ this.metricService.set(metricService);
+ checkIfThereRemainingData();
+ }
+
+ @Override
+ public void unbindFrom(AbstractMetricService metricService) {
+ // do nothing here
+ }
+
+ // region external update tsfile related metrics
+ public void addTsFile(String database, String regionId, long size, boolean
seq, String name) {
+ updateGlobalTsFileCountAndSize(database, regionId, 1, size, seq);
+ try {
+ TsFileNameGenerator.TsFileName tsFileName =
TsFileNameGenerator.getTsFileName(name);
+ int level = tsFileName.getInnerCompactionCnt();
+ updateLevelTsFileCountAndSize(size, 1, seq, level);
+ } catch (IOException e) {
+ LOGGER.warn("Unexpected error occurred when getting tsfile name", e);
+ }
+ }
+
+ public void deleteFile(boolean seq, List<TsFileResource> tsFileResourceList)
{
+ for (TsFileResource tsFileResource : tsFileResourceList) {
+ String name = tsFileResource.getTsFile().getName();
+ long size = tsFileResource.getTsFileSize();
+ updateGlobalTsFileCountAndSize(
+ tsFileResource.getDatabaseName(), tsFileResource.getDataRegionId(),
-1, -size, seq);
+ try {
+ TsFileNameGenerator.TsFileName tsFileName =
TsFileNameGenerator.getTsFileName(name);
+ int level = tsFileName.getInnerCompactionCnt();
+ updateLevelTsFileCountAndSize(-size, -1, seq, level);
+ } catch (IOException e) {
+ LOGGER.warn("Unexpected error occurred when getting tsfile name", e);
+ }
+ }
+ }
+
+ public void deleteRegion(String database, String regionId) {
+ Arrays.asList(seqFileCountMap, unseqFileCountMap)
+ .forEach(map -> deleteRegionFromMap(map, database, regionId));
+ Arrays.asList(seqFileSizeMap, unseqFileSizeMap)
+ .forEach(map -> deleteRegionFromMap(map, database, regionId));
+ }
+
+ private <T> void deleteRegionFromMap(
+ Map<String, Map<String, T>> map, String database, String regionId) {
+ map.computeIfPresent(
+ database,
+ (k, v) -> {
+ v.remove(regionId);
+ return v.isEmpty() ? null : v;
+ });
+ }
+
+ // endregion
+
+ // region update global tsfile value map and gauge map
+ private void updateGlobalTsFileCountAndSize(
+ String database, String regionId, int countDelta, long sizeDelta,
boolean seq) {
+ updateGlobalTsFileCountMap(
+ (seq ? seqFileCountMap : unseqFileCountMap),
+ (seq ? SEQUENCE : UNSEQUENCE),
+ database,
+ regionId,
+ countDelta);
+ updateGlobalTsFileSizeMap(
+ (seq ? seqFileSizeMap : unseqFileSizeMap),
+ (seq ? SEQUENCE : UNSEQUENCE),
+ database,
+ regionId,
+ sizeDelta);
+ }
+
+ private void updateGlobalTsFileCountMap(
+ Map<String, Map<String, Pair<Integer, Gauge>>> map,
+ String orderStr,
+ String database,
+ String regionId,
+ int value) {
+ Map<String, Pair<Integer, Gauge>> innerMap =
+ map.computeIfAbsent(database, k -> new ConcurrentHashMap<>());
+ innerMap.compute(
+ regionId,
+ (k, v) -> {
+ // add pair if regionId not exists, and update value
+ if (v == null) {
+ v = new Pair<>(value, null);
+ } else {
+ v.setLeft(v.getLeft() + value);
+ }
+ // try to add gauge if gauge not exists
+ if (v.getRight() == null) {
+ if (metricService.get() != null) {
+ v.setRight(getOrCreateGlobalTsFileCountGauge(orderStr, database,
regionId));
+ } else {
+ hasRemainData.set(true);
+ }
+ }
+ // try to update gauge
+ if (v.getRight() != null) {
+ v.getRight().set(v.getLeft());
+ }
+ return v;
+ });
+ }
+
+ public Gauge getOrCreateGlobalTsFileCountGauge(
+ String orderStr, String database, String regionId) {
+ return metricService
+ .get()
+ .getOrCreateGauge(
+ FILE_GLOBAL_COUNT,
+ MetricLevel.CORE,
+ Tag.NAME.toString(),
+ orderStr,
+ Tag.DATABASE.toString(),
+ database,
+ Tag.REGION.toString(),
+ regionId);
+ }
+
+ private void updateGlobalTsFileSizeMap(
+ Map<String, Map<String, Pair<Long, Gauge>>> map,
+ String orderStr,
+ String database,
+ String regionId,
+ long value) {
+ Map<String, Pair<Long, Gauge>> innerMap =
+ map.computeIfAbsent(database, k -> new ConcurrentHashMap<>());
+ innerMap.compute(
+ regionId,
+ (k, v) -> {
+ // add pair if regionId not exists, and update value
+ if (v == null) {
+ v = new Pair<>(value, null);
+ } else {
+ v.setLeft(v.getLeft() + value);
+ }
+ // try to add gauge if gauge not exists
+ if (v.getRight() == null) {
+ if (metricService.get() != null) {
+ v.setRight(getOrCreateGlobalTsFileSizeGauge(orderStr, database,
regionId));
+ } else {
+ hasRemainData.set(true);
+ }
+ }
+ // try to update gauge
+ if (v.getRight() != null) {
+ v.getRight().set(v.getLeft());
+ }
+ return v;
+ });
+ }
+
+ public Gauge getOrCreateGlobalTsFileSizeGauge(String orderStr, String
database, String regionId) {
+ return metricService
+ .get()
+ .getOrCreateGauge(
+ FILE_GLOBAL_SIZE,
+ MetricLevel.CORE,
+ Tag.NAME.toString(),
+ orderStr,
+ Tag.DATABASE.toString(),
+ database,
+ Tag.REGION.toString(),
+ regionId);
+ }
+
+ // endregion
+
+ // region update level tsfile value map and gauge map
+
+ private void updateLevelTsFileCountAndSize(
+ long sizeDelta, int countDelta, boolean seq, int level) {
+ updateLevelTsFileCountMap(
+ seq ? seqLevelTsFileCountMap : unseqLevelTsFileCountMap,
+ seq ? SEQUENCE : UNSEQUENCE,
+ level,
+ countDelta);
+ updateLevelTsFileSizeMap(
+ seq ? seqLevelTsFileSizeMap : unseqLevelTsFileSizeMap,
+ seq ? SEQUENCE : UNSEQUENCE,
+ level,
+ sizeDelta);
+ }
+
+ private void updateLevelTsFileCountMap(
+ Map<Integer, Pair<Integer, Gauge>> map, String orderStr, int level, int
value) {
+ map.compute(
+ level,
+ (k, v) -> {
+ // add pair if level not exists, and update value
+ if (v == null) {
+ v = new Pair<>(value, null);
+ } else {
+ v.setLeft(v.getLeft() + value);
+ }
+ // try to add gauge if gauge not exists
+ if (v.getRight() == null) {
+ if (metricService.get() != null) {
+ v.setRight(getOrCreateLevelTsFileCountGauge(orderStr, level));
+ } else {
+ hasRemainData.set(true);
+ }
+ }
+ // try to update gauge
+ if (v.getRight() != null) {
+ v.getRight().set(v.getLeft());
+ }
+ return v;
+ });
+ }
+
+ public Gauge getOrCreateLevelTsFileCountGauge(String orderStr, int level) {
+ return metricService
+ .get()
+ .getOrCreateGauge(
+ FILE_LEVEL_COUNT,
+ MetricLevel.CORE,
+ Tag.NAME.toString(),
+ orderStr,
+ LEVEL,
+ String.valueOf(level));
+ }
+
+ private void updateLevelTsFileSizeMap(
+ Map<Integer, Pair<Long, Gauge>> map, String orderStr, int level, long
value) {
+ map.compute(
+ level,
+ (k, v) -> {
+ // add pair if level not exists, and update value
+ if (v == null) {
+ v = new Pair<>(value, null);
+ } else {
+ v.setLeft(v.getLeft() + value);
+ }
+ // try to add gauge if gauge not exists
+ if (v.getRight() == null) {
+ if (metricService.get() != null) {
+ v.setRight(getOrCreateLevelTsFileSizeGauge(orderStr, level));
+ } else {
+ hasRemainData.set(true);
+ }
+ }
+ // try to update gauge
+ if (v.getRight() != null) {
+ v.getRight().set(v.getLeft());
+ }
+ return v;
+ });
+ }
+
+ public Gauge getOrCreateLevelTsFileSizeGauge(String orderStr, int level) {
+ return metricService
+ .get()
+ .getOrCreateGauge(
+ FILE_LEVEL_SIZE,
+ MetricLevel.CORE,
+ Tag.NAME.toString(),
+ orderStr,
+ LEVEL,
+ String.valueOf(level));
+ }
+
+ // endregion
+
+ // region check remain data
+ private void checkIfThereRemainingData() {
+ if (hasRemainData.get()) {
+ synchronized (this) {
+ if (hasRemainData.get()) {
+ hasRemainData.set(false);
+ updateRemainData(true);
+ updateRemainData(false);
+ }
+ }
+ }
+ }
+
+ private synchronized void updateRemainData(boolean seq) {
+ for (Map.Entry<String, Map<String, Pair<Integer, Gauge>>> entry :
+ (seq ? seqFileCountMap : unseqFileCountMap).entrySet()) {
+ for (Map.Entry<String, Pair<Integer, Gauge>> innerEntry :
entry.getValue().entrySet()) {
+ updateGlobalTsFileCountMap(
+ (seq ? seqFileCountMap : unseqFileCountMap),
+ (seq ? SEQUENCE : UNSEQUENCE),
+ entry.getKey(),
+ innerEntry.getKey(),
+ innerEntry.getValue().getLeft());
+ }
+ }
+ for (Map.Entry<String, Map<String, Pair<Long, Gauge>>> entry :
+ (seq ? seqFileSizeMap : unseqFileSizeMap).entrySet()) {
+ for (Map.Entry<String, Pair<Long, Gauge>> innerEntry :
entry.getValue().entrySet()) {
+ updateGlobalTsFileSizeMap(
+ (seq ? seqFileSizeMap : unseqFileSizeMap),
+ (seq ? SEQUENCE : UNSEQUENCE),
+ entry.getKey(),
+ innerEntry.getKey(),
+ innerEntry.getValue().getLeft());
+ }
+ }
+ for (Map.Entry<Integer, Pair<Integer, Gauge>> entry :
+ (seq ? seqLevelTsFileCountMap : unseqLevelTsFileCountMap).entrySet()) {
+ updateLevelTsFileCountMap(
+ seq ? seqLevelTsFileCountMap : unseqLevelTsFileCountMap,
+ seq ? SEQUENCE : UNSEQUENCE,
+ entry.getKey(),
+ entry.getValue().getLeft());
+ }
+ for (Map.Entry<Integer, Pair<Long, Gauge>> entry :
+ (seq ? seqLevelTsFileSizeMap : unseqLevelTsFileSizeMap).entrySet()) {
+ updateLevelTsFileSizeMap(
+ seq ? seqLevelTsFileSizeMap : unseqLevelTsFileSizeMap,
+ seq ? SEQUENCE : UNSEQUENCE,
+ entry.getKey(),
+ entry.getValue().getLeft());
+ }
+ }
+
+ // endregion
+
+ @TestOnly
+ public long getFileCount(boolean seq) {
+ long fileCount = 0;
+ for (Map.Entry<String, Map<String, Pair<Integer, Gauge>>> entry :
+ (seq ? seqFileCountMap : unseqFileCountMap).entrySet()) {
+ for (Map.Entry<String, Pair<Integer, Gauge>> regionEntry :
entry.getValue().entrySet()) {
+ fileCount += regionEntry.getValue().getLeft();
+ }
+ }
+ return fileCount;
+ }
+}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/metrics/file/WalFileMetrics.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/metrics/file/WalFileMetrics.java
new file mode 100644
index 00000000000..5edac3e6fae
--- /dev/null
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/metrics/file/WalFileMetrics.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.service.metrics.file;
+
+import org.apache.iotdb.commons.service.metric.enums.Metric;
+import org.apache.iotdb.commons.service.metric.enums.Tag;
+import org.apache.iotdb.db.storageengine.dataregion.wal.WALManager;
+import org.apache.iotdb.metrics.AbstractMetricService;
+import org.apache.iotdb.metrics.metricsets.IMetricSet;
+import org.apache.iotdb.metrics.utils.MetricLevel;
+import org.apache.iotdb.metrics.utils.MetricType;
+
+public class WalFileMetrics implements IMetricSet {
+ private static final WALManager WAL_MANAGER = WALManager.getInstance();
+
+ @Override
+ public void bindTo(AbstractMetricService metricService) {
+ metricService.createAutoGauge(
+ Metric.FILE_SIZE.toString(),
+ MetricLevel.CORE,
+ WAL_MANAGER,
+ WALManager::getTotalDiskUsage,
+ Tag.NAME.toString(),
+ "wal");
+ metricService.createAutoGauge(
+ Metric.FILE_COUNT.toString(),
+ MetricLevel.CORE,
+ WAL_MANAGER,
+ WALManager::getTotalFileNum,
+ Tag.NAME.toString(),
+ "wal");
+ }
+
+ @Override
+ public void unbindFrom(AbstractMetricService metricService) {
+ metricService.remove(
+ MetricType.AUTO_GAUGE, Metric.FILE_SIZE.toString(),
Tag.NAME.toString(), "wal");
+ metricService.remove(
+ MetricType.AUTO_GAUGE, Metric.FILE_COUNT.toString(),
Tag.NAME.toString(), "wal");
+ }
+}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
index fe5ad69af70..92d7cf7472e 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
@@ -443,7 +443,12 @@ public class DataRegion implements IDataRegionForQuery {
for (TsFileResource resource : value) {
if (resource.resourceFileExists()) {
FileMetrics.getInstance()
- .addFile(resource.getTsFile().length(), true,
resource.getTsFile().getName());
+ .addTsFile(
+ resource.getDatabaseName(),
+ resource.getDataRegionId(),
+ resource.getTsFile().length(),
+ true,
+ resource.getTsFile().getName());
if (resource.getModFile().exists()) {
FileMetrics.getInstance().increaseModFileNum(1);
FileMetrics.getInstance().increaseModFileSize(resource.getModFile().getSize());
@@ -469,7 +474,12 @@ public class DataRegion implements IDataRegionForQuery {
for (TsFileResource resource : value) {
if (resource.resourceFileExists()) {
FileMetrics.getInstance()
- .addFile(resource.getTsFile().length(), false,
resource.getTsFile().getName());
+ .addTsFile(
+ resource.getDatabaseName(),
+ resource.getDataRegionId(),
+ resource.getTsFile().length(),
+ false,
+ resource.getTsFile().getName());
}
if (resource.getModFile().exists()) {
FileMetrics.getInstance().increaseModFileNum(1);
@@ -698,7 +708,9 @@ public class DataRegion implements IDataRegionForQuery {
updateLastFlushTime(tsFileResource, isSeq);
tsFileResourceManager.registerSealedTsFileResource(tsFileResource);
FileMetrics.getInstance()
- .addFile(
+ .addTsFile(
+ tsFileResource.getDatabaseName(),
+ tsFileResource.getDataRegionId(),
tsFileResource.getTsFile().length(),
recoverPerformer.isSequence(),
tsFileResource.getTsFile().getName());
@@ -1441,11 +1453,7 @@ public class DataRegion implements IDataRegionForQuery {
tsFileResourceList.addAll(tsFileManager.getTsFileList(false));
tsFileResourceList.forEach(
x -> {
- FileMetrics.getInstance()
- .deleteFile(
- new long[] {x.getTsFileSize()},
- x.isSeq(),
- Collections.singletonList(x.getTsFile().getName()));
+ FileMetrics.getInstance().deleteTsFile(x.isSeq(),
Collections.singletonList(x));
if (x.getModFile().exists()) {
FileMetrics.getInstance().decreaseModFileNum(1);
FileMetrics.getInstance().decreaseModFileSize(x.getModFile().getSize());
@@ -1520,11 +1528,7 @@ public class DataRegion implements IDataRegionForQuery {
try {
// try to delete physical data file
resource.remove();
- FileMetrics.getInstance()
- .deleteFile(
- new long[] {resource.getTsFileSize()},
- isSeq,
- Collections.singletonList(resource.getTsFile().getName()));
+ FileMetrics.getInstance().deleteTsFile(isSeq,
Collections.singletonList(resource));
logger.info(
"Removed a file {} before {} by ttl ({} {})",
resource.getTsFilePath(),
@@ -2069,11 +2073,14 @@ public class DataRegion implements IDataRegionForQuery {
synchronized (closeStorageGroupCondition) {
closeStorageGroupCondition.notifyAll();
}
+ TsFileResource tsFileResource = tsFileProcessor.getTsFileResource();
FileMetrics.getInstance()
- .addFile(
- tsFileProcessor.getTsFileResource().getTsFileSize(),
+ .addTsFile(
+ tsFileResource.getDatabaseName(),
+ tsFileResource.getDataRegionId(),
+ tsFileResource.getTsFileSize(),
tsFileProcessor.isSequence(),
- tsFileProcessor.getTsFileResource().getTsFile().getName());
+ tsFileResource.getTsFile().getName());
logger.info("signal closing database condition in {}", databaseName + "-"
+ dataRegionId);
}
@@ -2197,7 +2204,9 @@ public class DataRegion implements IDataRegionForQuery {
.listenToTsFile(dataRegionId, newTsFileResource, isGeneratedByPipe);
FileMetrics.getInstance()
- .addFile(
+ .addTsFile(
+ newTsFileResource.getDatabaseName(),
+ newTsFileResource.getDataRegionId(),
newTsFileResource.getTsFile().length(),
false,
newTsFileResource.getTsFile().getName());
@@ -2487,10 +2496,7 @@ public class DataRegion implements IDataRegionForQuery {
tsFileResourceToBeMoved = sequenceResource;
tsFileManager.remove(tsFileResourceToBeMoved, true);
FileMetrics.getInstance()
- .deleteFile(
- new long[] {tsFileResourceToBeMoved.getTsFileSize()},
- true,
-
Collections.singletonList(tsFileResourceToBeMoved.getTsFile().getName()));
+ .deleteTsFile(true,
Collections.singletonList(tsFileResourceToBeMoved));
break;
}
}
@@ -2502,10 +2508,7 @@ public class DataRegion implements IDataRegionForQuery {
tsFileResourceToBeMoved = unsequenceResource;
tsFileManager.remove(tsFileResourceToBeMoved, false);
FileMetrics.getInstance()
- .deleteFile(
- new long[] {tsFileResourceToBeMoved.getTsFileSize()},
- false,
-
Collections.singletonList(tsFileResourceToBeMoved.getTsFile().getName()));
+ .deleteTsFile(false,
Collections.singletonList(tsFileResourceToBeMoved));
break;
}
}
@@ -2924,6 +2927,7 @@ public class DataRegion implements IDataRegionForQuery {
if (!deleted) {
deletedCondition.await();
}
+ FileMetrics.getInstance().deleteRegion(databaseName, dataRegionId);
} catch (InterruptedException e) {
logger.error("Interrupted When waiting for data region deleted.");
Thread.currentThread().interrupt();
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/CrossSpaceCompactionTask.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/CrossSpaceCompactionTask.java
index 2af8ca30ae4..6ac84e046c5 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/CrossSpaceCompactionTask.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/CrossSpaceCompactionTask.java
@@ -211,22 +211,25 @@ public class CrossSpaceCompactionTask extends
AbstractCompactionTask {
.decreaseModFileSize(unsequenceResource.getModFile().getSize());
}
}
-
- long[] sequenceFileSize = deleteOldFiles(selectedSequenceFiles);
+ deleteOldFiles(selectedSequenceFiles);
List<String> fileNames = new ArrayList<>(selectedSequenceFiles.size());
selectedSequenceFiles.forEach(x ->
fileNames.add(x.getTsFile().getName()));
- FileMetrics.getInstance().deleteFile(sequenceFileSize, true,
fileNames);
+ FileMetrics.getInstance().deleteTsFile(true, selectedSequenceFiles);
fileNames.clear();
selectedUnsequenceFiles.forEach(x ->
fileNames.add(x.getTsFile().getName()));
- long[] unsequenceFileSize = deleteOldFiles(selectedUnsequenceFiles);
- FileMetrics.getInstance().deleteFile(unsequenceFileSize, false,
fileNames);
+ deleteOldFiles(selectedUnsequenceFiles);
+ FileMetrics.getInstance().deleteTsFile(false, selectedUnsequenceFiles);
CompactionUtils.deleteCompactionModsFile(selectedSequenceFiles,
selectedUnsequenceFiles);
for (TsFileResource targetResource : targetTsfileResourceList) {
if (!targetResource.isDeleted()) {
FileMetrics.getInstance()
- .addFile(
- targetResource.getTsFileSize(), true,
targetResource.getTsFile().getName());
+ .addTsFile(
+ targetResource.getDatabaseName(),
+ targetResource.getDataRegionId(),
+ targetResource.getTsFileSize(),
+ true,
+ targetResource.getTsFile().getName());
// set target resources to CLOSED, so that they can be selected to
compact
targetResource.setStatus(TsFileResourceStatus.NORMAL);
@@ -357,17 +360,13 @@ public class CrossSpaceCompactionTask extends
AbstractCompactionTask {
return equalsOtherTask((CrossSpaceCompactionTask) other);
}
- private long[] deleteOldFiles(List<TsFileResource> tsFileResourceList) {
- long[] size = new long[tsFileResourceList.size()];
- for (int i = 0, length = tsFileResourceList.size(); i < length; ++i) {
- TsFileResource tsFileResource = tsFileResourceList.get(i);
- size[i] = tsFileResource.getTsFileSize();
+ private void deleteOldFiles(List<TsFileResource> tsFileResourceList) {
+ for (TsFileResource tsFileResource : tsFileResourceList) {
tsFileResource.remove();
LOGGER.info(
"[CrossSpaceCompaction] Delete TsFile :{}.",
tsFileResource.getTsFile().getAbsolutePath());
}
- return size;
}
private void releaseReadAndLockWrite(List<TsFileResource>
tsFileResourceList) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/InnerSpaceCompactionTask.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/InnerSpaceCompactionTask.java
index c6a6762f932..ba2fe18980e 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/InnerSpaceCompactionTask.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/InnerSpaceCompactionTask.java
@@ -262,7 +262,9 @@ public class InnerSpaceCompactionTask extends
AbstractCompactionTask {
// inner space compaction task has only one target file
if (!targetTsFileResource.isDeleted()) {
FileMetrics.getInstance()
- .addFile(
+ .addTsFile(
+ targetTsFileResource.getDatabaseName(),
+ targetTsFileResource.getDataRegionId(),
targetTsFileResource.getTsFile().length(),
sequence,
targetTsFileResource.getTsFile().getName());
@@ -273,11 +275,7 @@ public class InnerSpaceCompactionTask extends
AbstractCompactionTask {
// target resource is empty after compaction, then delete it
targetTsFileResource.remove();
}
- List<String> fileNames = new ArrayList<>();
- for (TsFileResource resource : selectedTsFileResourceList) {
- fileNames.add(resource.getTsFile().getName());
- }
- FileMetrics.getInstance().deleteFile(sizeList, sequence, fileNames);
+ FileMetrics.getInstance().deleteTsFile(sequence,
selectedTsFileResourceList);
CompactionMetrics.getInstance().recordSummaryInfo(summary);
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/utils/CompactionFileGeneratorUtils.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/utils/CompactionFileGeneratorUtils.java
index 483bb922fa6..2195664cf91 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/utils/CompactionFileGeneratorUtils.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/utils/CompactionFileGeneratorUtils.java
@@ -84,15 +84,20 @@ public class CompactionFileGeneratorUtils {
if (sequence) {
return new TsFileResource(
new File(
- TestConstant.BASE_OUTPUT_PATH.concat(
- index
- + IoTDBConstant.FILE_NAME_SEPARATOR
- + index
- + IoTDBConstant.FILE_NAME_SEPARATOR
- + 0
- + IoTDBConstant.FILE_NAME_SEPARATOR
- + 0
- + ".tsfile")));
+ TestConstant.BASE_OUTPUT_PATH
+ .concat("database")
+ .concat(File.separator)
+ .concat("regionId")
+ .concat(File.separator)
+ .concat(
+ index
+ + IoTDBConstant.FILE_NAME_SEPARATOR
+ + index
+ + IoTDBConstant.FILE_NAME_SEPARATOR
+ + 0
+ + IoTDBConstant.FILE_NAME_SEPARATOR
+ + 0
+ + ".tsfile")));
} else {
return new TsFileResource(
new File(