This is an automated email from the ASF dual-hosted git repository. marklau99 pushed a commit to branch IOTDB-4423 in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit bb14d147aca035144d0832aaa895f48d19e05c0c Author: Liu Xuxin <[email protected]> AuthorDate: Tue Sep 27 17:16:44 2022 +0800 add FileMetrics --- .../iotdb/db/engine/TsFileMetricManager.java | 77 ++++++++++++++++++++++ .../compaction/cross/CrossSpaceCompactionTask.java | 12 ++++ .../compaction/inner/InnerSpaceCompactionTask.java | 7 ++ .../writer/CrossSpaceCompactionWriter.java | 6 +- .../iotdb/db/engine/storagegroup/DataRegion.java | 5 ++ .../db/engine/storagegroup/TsFileProcessor.java | 2 + .../db/service/metrics/predefined/FileMetrics.java | 61 ++--------------- 7 files changed, 111 insertions(+), 59 deletions(-) diff --git a/server/src/main/java/org/apache/iotdb/db/engine/TsFileMetricManager.java b/server/src/main/java/org/apache/iotdb/db/engine/TsFileMetricManager.java new file mode 100644 index 0000000000..62d99b59a0 --- /dev/null +++ b/server/src/main/java/org/apache/iotdb/db/engine/TsFileMetricManager.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.engine; + +import org.apache.iotdb.metrics.config.MetricConfigDescriptor; + +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; + +/** + * This class collect the number and size of tsfile, and send it to the {@link + * org.apache.iotdb.db.service.metrics.predefined.FileMetrics} + */ +public class TsFileMetricManager { + private static final TsFileMetricManager INSTANCE = new TsFileMetricManager(); + private AtomicLong seqFileSize = new AtomicLong(0); + private AtomicLong unseqFileSize = new AtomicLong(0); + private AtomicInteger seqFileNum = new AtomicInteger(0); + private AtomicInteger unseqFileNum = new AtomicInteger(0); + + private TsFileMetricManager() {} + + public static TsFileMetricManager getInstance() { + return INSTANCE; + } + + public void addFile(long size, boolean seq) { + if (!MetricConfigDescriptor.getInstance().getMetricConfig().getEnableMetric()) { + return; + } + if (seq) { + seqFileSize.getAndAdd(size); + seqFileNum.incrementAndGet(); + } else { + unseqFileSize.getAndAdd(size); + unseqFileNum.incrementAndGet(); + } + } + + public void deleteFile(long size, boolean seq) { + if (!MetricConfigDescriptor.getInstance().getMetricConfig().getEnableMetric()) { + return; + } + if (seq) { + seqFileSize.getAndAdd(-size); + seqFileNum.getAndAdd(-1); + } else { + unseqFileSize.getAndAdd(-size); + unseqFileNum.getAndAdd(-1); + } + } + + public long getFileSize(boolean seq) { + return seq ? seqFileSize.get() : unseqFileSize.get(); + } + + public long getFileNum(boolean seq) { + return seq ? seqFileNum.get() : unseqFileNum.get(); + } +} diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/CrossSpaceCompactionTask.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/CrossSpaceCompactionTask.java index 650404a6fc..a966c93f7f 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/CrossSpaceCompactionTask.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/CrossSpaceCompactionTask.java @@ -20,6 +20,7 @@ package org.apache.iotdb.db.engine.compaction.cross; import org.apache.iotdb.commons.conf.IoTDBConstant; +import org.apache.iotdb.db.engine.TsFileMetricManager; import org.apache.iotdb.db.engine.compaction.CompactionExceptionHandler; import org.apache.iotdb.db.engine.compaction.CompactionUtils; import org.apache.iotdb.db.engine.compaction.log.CompactionLogger; @@ -169,6 +170,17 @@ public class CrossSpaceCompactionTask extends AbstractCompactionTask { deleteOldFiles(selectedSequenceFiles); deleteOldFiles(selectedUnsequenceFiles); + + for (TsFileResource seqResource : selectedSequenceFiles) { + TsFileMetricManager.getInstance().deleteFile(seqResource.getTsFileSize(), true); + } + for (TsFileResource unseqResource : selectedUnsequenceFiles) { + TsFileMetricManager.getInstance().deleteFile(unseqResource.getTsFileSize(), false); + } + for (TsFileResource targetResource : targetTsfileResourceList) { + TsFileMetricManager.getInstance().addFile(targetResource.getTsFileSize(), true); + } + CompactionUtils.deleteCompactionModsFile(selectedSequenceFiles, selectedUnsequenceFiles); if (logFile.exists()) { diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/InnerSpaceCompactionTask.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/InnerSpaceCompactionTask.java index c133947421..92124a4db4 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/InnerSpaceCompactionTask.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/InnerSpaceCompactionTask.java @@ -20,6 +20,7 @@ package org.apache.iotdb.db.engine.compaction.inner; import org.apache.iotdb.commons.conf.IoTDBConstant; +import org.apache.iotdb.db.engine.TsFileMetricManager; import org.apache.iotdb.db.engine.compaction.CompactionExceptionHandler; import org.apache.iotdb.db.engine.compaction.CompactionUtils; import org.apache.iotdb.db.engine.compaction.log.CompactionLogger; @@ -206,6 +207,12 @@ public class InnerSpaceCompactionTask extends AbstractCompactionTask { selectedTsFileResourceList, storageGroupName + "-" + dataRegionId); CompactionUtils.deleteModificationForSourceFile( selectedTsFileResourceList, storageGroupName + "-" + dataRegionId); + for (TsFileResource resource : selectedTsFileResourceList) { + TsFileMetricManager.getInstance().deleteFile(resource.getTsFile().length(), sequence); + } + // inner space compaction task has only one target file + TsFileMetricManager.getInstance() + .addFile(targetTsFileList.get(0).getTsFile().length(), sequence); double costTime = (System.currentTimeMillis() - startTime) / 1000.0d; LOGGER.info( diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/writer/CrossSpaceCompactionWriter.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/writer/CrossSpaceCompactionWriter.java index d192c0f6d7..373305980f 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/writer/CrossSpaceCompactionWriter.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/writer/CrossSpaceCompactionWriter.java @@ -215,10 +215,10 @@ public class CrossSpaceCompactionWriter extends AbstractCompactionWriter { @Override public void updateStartTimeAndEndTime(String device, long time, int subTaskId) { - synchronized (this) { - int fileIndex = seqFileIndexArray[subTaskId]; + int fileIndex = seqFileIndexArray[subTaskId]; + // we need to synchronized here to avoid multi-thread competition in sub-task + synchronized (targetTsFileResources.get(fileIndex)) { TsFileResource resource = targetTsFileResources.get(fileIndex); - // we need to synchronized here to avoid multi-thread competition in sub-task resource.updateStartTime(device, time); resource.updateEndTime(device, time); } diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/DataRegion.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/DataRegion.java index 59df2d878f..37395e1207 100755 --- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/DataRegion.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/DataRegion.java @@ -36,6 +36,7 @@ import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.conf.directories.DirectoryManager; import org.apache.iotdb.db.engine.StorageEngine; import org.apache.iotdb.db.engine.StorageEngineV2; +import org.apache.iotdb.db.engine.TsFileMetricManager; import org.apache.iotdb.db.engine.compaction.CompactionRecoverManager; import org.apache.iotdb.db.engine.compaction.CompactionScheduler; import org.apache.iotdb.db.engine.compaction.CompactionTaskManager; @@ -461,6 +462,7 @@ public class DataRegion { while (!value.isEmpty()) { TsFileResource tsFileResource = value.get(value.size() - 1); if (tsFileResource.resourceFileExists()) { + TsFileMetricManager.getInstance().addFile(tsFileResource.getTsFile().length(), true); break; } else { value.remove(value.size() - 1); @@ -477,6 +479,7 @@ public class DataRegion { while (!value.isEmpty()) { TsFileResource tsFileResource = value.get(value.size() - 1); if (tsFileResource.resourceFileExists()) { + TsFileMetricManager.getInstance().addFile(tsFileResource.getTsFile().length(), false); break; } else { value.remove(value.size() - 1); @@ -749,6 +752,8 @@ public class DataRegion { logger.error("Fail to close TsFile {} when recovering", tsFileResource.getTsFile(), e); } tsFileResourceManager.registerSealedTsFileResource(tsFileResource); + TsFileMetricManager.getInstance() + .addFile(tsFileResource.getTsFile().length(), recoverPerformer.isSequence()); } else { // the last file is not closed, continue writing to it RestorableTsFileIOWriter writer = recoverPerformer.getWriter(); diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java index 735834d551..7036be7e0d 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java @@ -29,6 +29,7 @@ import org.apache.iotdb.db.conf.IoTDBConfig; import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.conf.adapter.CompressionRatio; import org.apache.iotdb.db.engine.StorageEngine; +import org.apache.iotdb.db.engine.TsFileMetricManager; import org.apache.iotdb.db.engine.flush.CloseFileListener; import org.apache.iotdb.db.engine.flush.FlushListener; import org.apache.iotdb.db.engine.flush.FlushManager; @@ -1461,6 +1462,7 @@ public class TsFileProcessor { writer.getFile().length(), closeEndTime - closeStartTime); } + TsFileMetricManager.getInstance().addFile(tsFileResource.getTsFile().length(), sequence); writer = null; } diff --git a/server/src/main/java/org/apache/iotdb/db/service/metrics/predefined/FileMetrics.java b/server/src/main/java/org/apache/iotdb/db/service/metrics/predefined/FileMetrics.java index 8928dabbd7..8ebb8f130c 100644 --- a/server/src/main/java/org/apache/iotdb/db/service/metrics/predefined/FileMetrics.java +++ b/server/src/main/java/org/apache/iotdb/db/service/metrics/predefined/FileMetrics.java @@ -21,9 +21,8 @@ package org.apache.iotdb.db.service.metrics.predefined; import org.apache.iotdb.commons.concurrent.threadpool.ScheduledExecutorUtil; import org.apache.iotdb.commons.conf.CommonDescriptor; -import org.apache.iotdb.commons.conf.IoTDBConstant; -import org.apache.iotdb.commons.utils.FileUtils; import org.apache.iotdb.db.conf.IoTDBDescriptor; +import org.apache.iotdb.db.engine.TsFileMetricManager; import org.apache.iotdb.db.service.metrics.enums.Metric; import org.apache.iotdb.db.service.metrics.enums.Tag; import org.apache.iotdb.db.wal.WALManager; @@ -138,22 +137,8 @@ public class FileMetrics implements IMetricSet { String[] dataDirs = IoTDBDescriptor.getInstance().getConfig().getDataDirs(); String[] walDirs = CommonDescriptor.getInstance().getConfig().getWalDirs(); walFileTotalSize = WALManager.getInstance().getTotalDiskUsage(); - sequenceFileTotalSize = - Stream.of(dataDirs) - .mapToLong( - dir -> { - dir += File.separator + IoTDBConstant.SEQUENCE_FLODER_NAME; - return FileUtils.getDirSize(dir); - }) - .sum(); - unsequenceFileTotalSize = - Stream.of(dataDirs) - .mapToLong( - dir -> { - dir += File.separator + IoTDBConstant.UNSEQUENCE_FLODER_NAME; - return FileUtils.getDirSize(dir); - }) - .sum(); + sequenceFileTotalSize = TsFileMetricManager.getInstance().getFileSize(true); + unsequenceFileTotalSize = TsFileMetricManager.getInstance().getFileSize(false); walFileTotalCount = Stream.of(walDirs) .mapToLong( @@ -185,44 +170,8 @@ public class FileMetrics implements IMetricSet { } }) .sum(); - sequenceFileTotalCount = - Stream.of(dataDirs) - .mapToLong( - dir -> { - dir += File.separator + IoTDBConstant.SEQUENCE_FLODER_NAME; - File folder = new File(dir); - if (folder.exists()) { - try { - return org.apache.commons.io.FileUtils.listFiles( - new File(dir), new String[] {"tsfile"}, true) - .size(); - } catch (UncheckedIOException exception) { - // do nothing - logger.debug("Failed when count sequence tsfile: ", exception); - } - } - return 0L; - }) - .sum(); - unsequenceFileTotalCount = - Stream.of(dataDirs) - .mapToLong( - dir -> { - dir += File.separator + IoTDBConstant.UNSEQUENCE_FLODER_NAME; - File folder = new File(dir); - if (folder.exists()) { - try { - return org.apache.commons.io.FileUtils.listFiles( - new File(dir), new String[] {"tsfile"}, true) - .size(); - } catch (UncheckedIOException exception) { - // do nothing - logger.debug("Failed when count unsequence tsfile: ", exception); - } - } - return 0L; - }) - .sum(); + sequenceFileTotalCount = TsFileMetricManager.getInstance().getFileNum(true); + unsequenceFileTotalCount = TsFileMetricManager.getInstance().getFileNum(false); } public long getWalFileTotalSize() {
