This is an automated email from the ASF dual-hosted git repository. marklau99 pushed a commit to branch IOTDB-3164-0.13 in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 7929fb9a6fd1e29cb51bc98d8031e9f853d4594a Author: Liu Xuxin <[email protected]> AuthorDate: Mon Aug 29 15:33:55 2022 +0800 add memory control framework --- .../org/apache/iotdb/cluster/ClusterIoTDB.java | 6 +- .../resources/conf/iotdb-engine.properties | 11 +- .../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 40 +- .../org/apache/iotdb/db/conf/IoTDBDescriptor.java | 23 +- .../compaction/cross/CrossCompactionStrategy.java | 6 +- .../cross/CrossSpaceCompactionTaskFactory.java | 6 +- .../RewriteCrossSpaceCompactionSelector.java | 14 +- .../selector/ICrossSpaceMergeFileSelector.java | 2 + .../selector/RewriteCompactionFileSelector.java | 6 + .../task/RewriteCrossCompactionRecoverTask.java | 467 --------------------- .../task/RewriteCrossSpaceCompactionTask.java | 13 +- .../inner/utils/InnerSpaceCompactionUtils.java | 38 -- .../iotdb/db/rescon/PrimitiveArrayManager.java | 2 +- .../org/apache/iotdb/db/rescon/SystemInfo.java | 28 +- .../engine/compaction/CompactionSchedulerTest.java | 13 +- .../compaction/CompactionTaskManagerTest.java | 3 +- .../compaction/cross/CrossSpaceCompactionTest.java | 9 +- .../cross/CrossSpaceCompactionValidationTest.java | 351 ++++++++++++++-- .../cross/RewriteCrossSpaceCompactionTest.java | 12 +- .../task/FakedCrossSpaceCompactionTaskFactory.java | 47 --- 20 files changed, 454 insertions(+), 643 deletions(-) diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/ClusterIoTDB.java b/cluster/src/main/java/org/apache/iotdb/cluster/ClusterIoTDB.java index b15635b4a1..cedb74b603 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/ClusterIoTDB.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/ClusterIoTDB.java @@ -298,13 +298,13 @@ public class ClusterIoTDB implements ClusterIoTDBMBean { if (clusterConfig.getReplicationNum() > 1) { clusterConfig.setMaxMemorySizeForRaftLog( (long) - (config.getAllocateMemoryForWrite() + (config.getAllocateMemoryForStorageEngine() * clusterConfig.getRaftLogMemoryProportion() / clusterConfig.getReplicationNum())); // calculate remaining memory allocated for write process - config.setAllocateMemoryForWrite( + config.setAllocateMemoryForStorageEngine( (long) - (config.getAllocateMemoryForWrite() + (config.getAllocateMemoryForStorageEngine() * (1 - clusterConfig.getRaftLogMemoryProportion()))); } return true; diff --git a/server/src/assembly/resources/conf/iotdb-engine.properties b/server/src/assembly/resources/conf/iotdb-engine.properties index ebd1a35242..3387f0107a 100644 --- a/server/src/assembly/resources/conf/iotdb-engine.properties +++ b/server/src/assembly/resources/conf/iotdb-engine.properties @@ -345,6 +345,10 @@ timestamp_precision=ms # If you have high level of writing pressure and low level of reading pressure, please adjust it to for example 6:1:1:2 # write_read_schema_free_memory_proportion=4:3:1:2 +# Memory allocation ratio in StorageEngine: Write, Compaction +# The parameter form is a:b:c:d, where a, b, c and d are integers. for example: 8:2 , 7:3 +# storage_engine_memory_proportion=8:2 + # primitive array size (length of each array) in array pool # Datatype: int # primitive_array_size=32 @@ -479,13 +483,6 @@ timestamp_precision=ms # Datatype: long, Unit: ms # cross_compaction_file_selection_time_budget=30000 -# How much memory may be used in ONE merge task, 10% of maximum JVM memory by default. -# This is only a rough estimation, starting from a relatively small value to avoid OOM. -# Each new merge thread may take such memory, so merge_thread_num * merge_memory_budget is the -# total memory estimation of merge. -# Datatype: long, Unit: Byte -# cross_compaction_memory_budget=268435456 - # How many threads will be set up to perform compaction, 10 by default. # Set to 1 when less than or equal to 0. # Datatype: int diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java index c3e0bdcde9..a3448ba496 100644 --- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java +++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java @@ -106,7 +106,7 @@ public class IoTDBConfig { private int rpcMaxConcurrentClientNum = 65535; /** Memory allocated for the write process */ - private long allocateMemoryForWrite = Runtime.getRuntime().maxMemory() * 4 / 10; + private long allocateMemoryForStorageEngine = Runtime.getRuntime().maxMemory() * 4 / 10; /** Memory allocated for the read process */ private long allocateMemoryForRead = Runtime.getRuntime().maxMemory() * 3 / 10; @@ -125,6 +125,12 @@ public class IoTDBConfig { /** Memory allocated proportion for timeIndex */ private double timeIndexMemoryProportion = 0.2; + /** The proportion of write memory for write process */ + private double writeProportion = 0.8; + + /** The proportion of write memory for compaction */ + private double compactionProportion = 0.2; + /** Flush proportion for system */ private double flushProportion = 0.4; @@ -1531,14 +1537,6 @@ public class IoTDBConfig { this.chunkBufferPoolEnable = chunkBufferPoolEnable; } - public long getCrossCompactionMemoryBudget() { - return crossCompactionMemoryBudget; - } - - public void setCrossCompactionMemoryBudget(long crossCompactionMemoryBudget) { - this.crossCompactionMemoryBudget = crossCompactionMemoryBudget; - } - public long getMergeIntervalSec() { return mergeIntervalSec; } @@ -1587,12 +1585,12 @@ public class IoTDBConfig { this.storageGroupSizeReportThreshold = storageGroupSizeReportThreshold; } - public long getAllocateMemoryForWrite() { - return allocateMemoryForWrite; + public long getAllocateMemoryForStorageEngine() { + return allocateMemoryForStorageEngine; } - public void setAllocateMemoryForWrite(long allocateMemoryForWrite) { - this.allocateMemoryForWrite = allocateMemoryForWrite; + public void setAllocateMemoryForStorageEngine(long allocateMemoryForStorageEngine) { + this.allocateMemoryForStorageEngine = allocateMemoryForStorageEngine; } public long getAllocateMemoryForSchema() { @@ -2765,4 +2763,20 @@ public class IoTDBConfig { public void setSchemaQueryFetchSize(int schemaQueryFetchSize) { this.schemaQueryFetchSize = schemaQueryFetchSize; } + + public double getWriteProportion() { + return writeProportion; + } + + public void setWriteProportion(double writeProportion) { + this.writeProportion = writeProportion; + } + + public double getCompactionProportion() { + return compactionProportion; + } + + public void setCompactionProportion(double compactionProportion) { + this.compactionProportion = compactionProportion; + } } diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java index cdb0057a2d..8629974e7d 100644 --- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java +++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java @@ -552,11 +552,6 @@ public class IoTDBDescriptor { Integer.parseInt( properties.getProperty( "upgrade_thread_num", Integer.toString(conf.getUpgradeThreadNum())))); - conf.setCrossCompactionMemoryBudget( - Long.parseLong( - properties.getProperty( - "cross_compaction_memory_budget", - Long.toString(conf.getCrossCompactionMemoryBudget())))); conf.setCrossCompactionFileSelectionTimeBudget( Long.parseLong( properties.getProperty( @@ -1407,7 +1402,7 @@ public class IoTDBDescriptor { } long maxMemoryAvailable = Runtime.getRuntime().maxMemory(); if (proportionSum != 0) { - conf.setAllocateMemoryForWrite( + conf.setAllocateMemoryForStorageEngine( maxMemoryAvailable * Integer.parseInt(proportions[0].trim()) / proportionSum); conf.setAllocateMemoryForRead( maxMemoryAvailable * Integer.parseInt(proportions[1].trim()) / proportionSum); @@ -1417,7 +1412,7 @@ public class IoTDBDescriptor { } logger.info("allocateMemoryForRead = {}", conf.getAllocateMemoryForRead()); - logger.info("allocateMemoryForWrite = {}", conf.getAllocateMemoryForWrite()); + logger.info("allocateMemoryForWrite = {}", conf.getAllocateMemoryForStorageEngine()); logger.info("allocateMemoryForSchema = {}", conf.getAllocateMemoryForSchema()); conf.setMaxQueryDeduplicatedPathNum( @@ -1457,6 +1452,7 @@ public class IoTDBDescriptor { } } } + initStorageEngineAllocate(properties); } @SuppressWarnings("squid:S3518") // "proportionSum" can't be zero @@ -1548,6 +1544,19 @@ public class IoTDBDescriptor { "cqlog_buffer_size", Integer.toString(conf.getCqlogBufferSize())))); } + private void initStorageEngineAllocate(Properties properties) { + String allocationRatio = properties.getProperty("storage_engine_memory_proportion", "8:2"); + String[] proportions = allocationRatio.split(":"); + int proportionForMemTable = Integer.parseInt(proportions[0].trim()); + int proportionForCompaction = Integer.parseInt(proportions[1].trim()); + conf.setWriteProportion( + ((double) (proportionForMemTable) + / (double) (proportionForCompaction + proportionForMemTable))); + conf.setCompactionProportion( + ((double) (proportionForCompaction) + / (double) (proportionForCompaction + proportionForMemTable))); + } + /** Get default encode algorithm by data type */ public TSEncoding getDefaultEncodingByType(TSDataType dataType) { switch (dataType) { diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/CrossCompactionStrategy.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/CrossCompactionStrategy.java index 05ae8acf5a..257d1ab3e1 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/CrossCompactionStrategy.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/CrossCompactionStrategy.java @@ -42,7 +42,8 @@ public enum CrossCompactionStrategy { long timePartitionId, TsFileManager tsFileManager, List<TsFileResource> selectedSeqTsFileResourceList, - List<TsFileResource> selectedUnSeqTsFileResourceList) { + List<TsFileResource> selectedUnSeqTsFileResourceList, + long memoryCost) { switch (this) { case REWRITE_COMPACTION: default: @@ -53,7 +54,8 @@ public enum CrossCompactionStrategy { tsFileManager, selectedSeqTsFileResourceList, selectedUnSeqTsFileResourceList, - CompactionTaskManager.currentTaskNum); + CompactionTaskManager.currentTaskNum, + memoryCost); } } diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/CrossSpaceCompactionTaskFactory.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/CrossSpaceCompactionTaskFactory.java index 3c98410ef3..1f68587e8b 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/CrossSpaceCompactionTaskFactory.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/CrossSpaceCompactionTaskFactory.java @@ -34,7 +34,8 @@ public class CrossSpaceCompactionTaskFactory { long timePartitionId, TsFileManager tsFileManager, List<TsFileResource> selectedSeqTsFileResourceList, - List<TsFileResource> selectedUnSeqTsFileResourceList) { + List<TsFileResource> selectedUnSeqTsFileResourceList, + long memoryCost) { return IoTDBDescriptor.getInstance() .getConfig() .getCrossCompactionStrategy() @@ -44,6 +45,7 @@ public class CrossSpaceCompactionTaskFactory { timePartitionId, tsFileManager, selectedSeqTsFileResourceList, - selectedUnSeqTsFileResourceList); + selectedUnSeqTsFileResourceList, + memoryCost); } } diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/rewrite/RewriteCrossSpaceCompactionSelector.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/rewrite/RewriteCrossSpaceCompactionSelector.java index 452e4d0bf5..9599296f15 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/rewrite/RewriteCrossSpaceCompactionSelector.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/rewrite/RewriteCrossSpaceCompactionSelector.java @@ -31,6 +31,7 @@ import org.apache.iotdb.db.engine.compaction.task.AbstractCompactionTask; import org.apache.iotdb.db.engine.storagegroup.TsFileManager; import org.apache.iotdb.db.engine.storagegroup.TsFileResource; import org.apache.iotdb.db.exception.MergeException; +import org.apache.iotdb.db.rescon.SystemInfo; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -88,7 +89,9 @@ public class RewriteCrossSpaceCompactionSelector extends AbstractCrossSpaceCompa if (seqFileList.isEmpty() || unSeqFileList.isEmpty()) { return; } - long budget = config.getCrossCompactionMemoryBudget(); + long budget = + SystemInfo.getInstance().getMemorySizeForCompaction() + / config.getConcurrentCompactionThread(); long timeLowerBound = System.currentTimeMillis() - Long.MAX_VALUE; CrossSpaceCompactionResource mergeResource = new CrossSpaceCompactionResource(seqFileList, unSeqFileList, timeLowerBound); @@ -97,6 +100,7 @@ public class RewriteCrossSpaceCompactionSelector extends AbstractCrossSpaceCompa InnerSpaceCompactionUtils.getCrossSpaceFileSelector(budget, mergeResource); try { List[] mergeFiles = fileSelector.select(); + List<Long> memoryCost = fileSelector.getMemoryCost(); // avoid pending tasks holds the metadata and streams mergeResource.clear(); if (mergeFiles.length == 0) { @@ -110,9 +114,10 @@ public class RewriteCrossSpaceCompactionSelector extends AbstractCrossSpaceCompa return; } LOGGER.info( - "select files for cross compaction, sequence files: {}, unsequence files {}", + "select files for cross compaction, sequence files: {}, unsequence files {}, memory cost is {}", mergeFiles[0], - mergeFiles[1]); + mergeFiles[1], + memoryCost.get(0)); if (mergeFiles[0].size() > 0 && mergeFiles[1].size() > 0) { AbstractCompactionTask compactionTask = @@ -122,7 +127,8 @@ public class RewriteCrossSpaceCompactionSelector extends AbstractCrossSpaceCompa timePartition, tsFileManager, mergeFiles[0], - mergeFiles[1]); + mergeFiles[1], + memoryCost.get(0)); CompactionTaskManager.getInstance().addTaskToWaitingQueue(compactionTask); LOGGER.info( "{} [Compaction] submit a task with {} sequence file and {} unseq files", diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/rewrite/selector/ICrossSpaceMergeFileSelector.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/rewrite/selector/ICrossSpaceMergeFileSelector.java index b0d36564ad..70e849f6d8 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/rewrite/selector/ICrossSpaceMergeFileSelector.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/rewrite/selector/ICrossSpaceMergeFileSelector.java @@ -31,5 +31,7 @@ public interface ICrossSpaceMergeFileSelector { List[] select() throws MergeException; + List<Long> getMemoryCost(); + int getConcurrentMergeNum(); } diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/rewrite/selector/RewriteCompactionFileSelector.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/rewrite/selector/RewriteCompactionFileSelector.java index 9c0009a4bb..d4da01db23 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/rewrite/selector/RewriteCompactionFileSelector.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/rewrite/selector/RewriteCompactionFileSelector.java @@ -31,6 +31,7 @@ import org.slf4j.LoggerFactory; import java.io.IOException; import java.util.ArrayList; import java.util.Collection; +import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -399,4 +400,9 @@ public class RewriteCompactionFileSelector implements ICrossSpaceMergeFileSelect public int getConcurrentMergeNum() { return concurrentMergeNum; } + + @Override + public List<Long> getMemoryCost() { + return Collections.singletonList(totalCost); + } } diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/rewrite/task/RewriteCrossCompactionRecoverTask.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/rewrite/task/RewriteCrossCompactionRecoverTask.java deleted file mode 100644 index 3c7084b8f8..0000000000 --- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/rewrite/task/RewriteCrossCompactionRecoverTask.java +++ /dev/null @@ -1,467 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.iotdb.db.engine.compaction.cross.rewrite.task; - -import org.apache.iotdb.db.conf.IoTDBConstant; -import org.apache.iotdb.db.conf.IoTDBDescriptor; -import org.apache.iotdb.db.engine.compaction.CompactionUtils; -import org.apache.iotdb.db.engine.compaction.TsFileIdentifier; -import org.apache.iotdb.db.engine.compaction.inner.utils.InnerSpaceCompactionUtils; -import org.apache.iotdb.db.engine.compaction.task.AbstractCompactionTask; -import org.apache.iotdb.db.engine.compaction.utils.log.CompactionLogAnalyzer; -import org.apache.iotdb.db.engine.compaction.utils.log.CompactionLogger; -import org.apache.iotdb.db.engine.modification.Modification; -import org.apache.iotdb.db.engine.modification.ModificationFile; -import org.apache.iotdb.db.engine.storagegroup.TsFileManager; -import org.apache.iotdb.db.engine.storagegroup.TsFileNameGenerator; -import org.apache.iotdb.db.engine.storagegroup.TsFileResource; -import org.apache.iotdb.db.engine.storagegroup.TsFileResourceStatus; -import org.apache.iotdb.db.utils.FileLoaderUtils; -import org.apache.iotdb.tsfile.common.constant.TsFileConstant; -import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer; -import org.apache.iotdb.tsfile.read.TsFileSequenceReader; - -import org.apache.commons.io.FileUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.File; -import java.io.IOException; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; -import java.util.concurrent.atomic.AtomicInteger; - -public class RewriteCrossCompactionRecoverTask extends RewriteCrossSpaceCompactionTask { - private final Logger LOGGER = LoggerFactory.getLogger(IoTDBConstant.COMPACTION_LOGGER_NAME); - private File compactionLogFile; - - public RewriteCrossCompactionRecoverTask( - String logicalStorageGroupName, - String virtualStorageGroupName, - long timePartitionId, - File logFile, - AtomicInteger currentTaskNum, - TsFileManager tsFileManager) { - super( - logicalStorageGroupName, - virtualStorageGroupName, - timePartitionId, - tsFileManager, - null, - null, - currentTaskNum); - this.compactionLogFile = logFile; - } - - @Override - public void doCompaction() { - boolean handleSuccess = true; - LOGGER.info( - "{} [Compaction][Recover] cross space compaction log is {}", - fullStorageGroupName, - compactionLogFile); - try { - if (compactionLogFile.exists()) { - LOGGER.info( - "{} [Compaction][Recover] cross space compaction log file {} exists, start to recover it", - fullStorageGroupName, - compactionLogFile); - CompactionLogAnalyzer logAnalyzer = new CompactionLogAnalyzer(compactionLogFile); - if (isOldLog()) { - // log from previous version (<0.13) - logAnalyzer.analyzeOldCrossCompactionLog(); - } else { - logAnalyzer.analyze(); - } - List<TsFileIdentifier> sourceFileIdentifiers = logAnalyzer.getSourceFileInfos(); - List<TsFileIdentifier> targetFileIdentifiers = logAnalyzer.getTargetFileInfos(); - - // compaction log file is incomplete - if (targetFileIdentifiers.isEmpty() || sourceFileIdentifiers.isEmpty()) { - LOGGER.info( - "{} [Compaction][Recover] incomplete log file, abort recover", fullStorageGroupName); - return; - } - - // check is all source files existed - boolean isAllSourcesFileExisted = true; - for (TsFileIdentifier sourceFileIdentifier : sourceFileIdentifiers) { - File sourceFile = sourceFileIdentifier.getFileFromDataDirs(); - if (sourceFile == null) { - isAllSourcesFileExisted = false; - break; - } - } - if (isAllSourcesFileExisted) { - if (logAnalyzer.isLogFromOld()) { - handleSuccess = handleWithAllSourceFilesExistFromOld(targetFileIdentifiers); - } else { - handleSuccess = - handleWithAllSourceFilesExist(targetFileIdentifiers, sourceFileIdentifiers); - } - } else { - if (logAnalyzer.isLogFromOld()) { - handleSuccess = - handleWithoutAllSourceFilesExistFromOld( - targetFileIdentifiers, sourceFileIdentifiers); - } else { - handleSuccess = handleWithoutAllSourceFilesExist(sourceFileIdentifiers); - } - } - } - } catch (IOException e) { - LOGGER.error("recover cross space compaction error", e); - } finally { - if (!handleSuccess) { - LOGGER.error( - "{} [Compaction][Recover] Failed to recover cross space compaction, set allowCompaction to false", - fullStorageGroupName); - tsFileManager.setAllowCompaction(false); - } else { - if (compactionLogFile.exists()) { - try { - LOGGER.info( - "{} [Compaction][Recover] Recover compaction successfully, delete log file {}", - fullStorageGroupName, - compactionLogFile); - FileUtils.delete(compactionLogFile); - } catch (IOException e) { - LOGGER.error( - "{} [Compaction][Recover] Exception occurs while deleting log file {}, set allowCompaction to false", - fullStorageGroupName, - compactionLogFile, - e); - tsFileManager.setAllowCompaction(false); - } - } - } - } - } - - /** - * All source files exist: (1) delete all the target files and tmp target files (2) delete - * compaction mods files. - */ - private boolean handleWithAllSourceFilesExist( - List<TsFileIdentifier> targetFileIdentifiers, List<TsFileIdentifier> sourceFileIdentifiers) { - LOGGER.info( - "{} [Compaction][Recover] all source files exists, delete all target files.", - fullStorageGroupName); - - for (TsFileIdentifier targetFileIdentifier : targetFileIdentifiers) { - // xxx.cross - File tmpTargetFile = targetFileIdentifier.getFileFromDataDirs(); - // xxx.tsfile - File targetFile = - getFileFromDataDirs( - targetFileIdentifier - .getFilePath() - .replace( - IoTDBConstant.CROSS_COMPACTION_TMP_FILE_SUFFIX, - TsFileConstant.TSFILE_SUFFIX)); - TsFileResource targetResource = null; - if (tmpTargetFile != null) { - targetResource = new TsFileResource(tmpTargetFile); - } else if (targetFile != null) { - targetResource = new TsFileResource(targetFile); - } - - if (targetResource != null && !targetResource.remove()) { - // failed to remove tmp target tsfile - // system should not carry out the subsequent compaction in case of data redundant - LOGGER.warn( - "{} [Compaction][Recover] failed to remove target file {}", - fullStorageGroupName, - targetResource); - return false; - } - } - - // delete compaction mods files - List<TsFileResource> sourceTsFileResourceList = new ArrayList<>(); - for (TsFileIdentifier sourceFileIdentifier : sourceFileIdentifiers) { - sourceTsFileResourceList.add(new TsFileResource(sourceFileIdentifier.getFileFromDataDirs())); - } - try { - CompactionUtils.deleteCompactionModsFile(sourceTsFileResourceList, Collections.emptyList()); - } catch (Throwable e) { - LOGGER.error( - "{} [Compaction][Recover] Exception occurs while deleting compaction mods file, set allowCompaction to false", - fullStorageGroupName, - e); - return false; - } - return true; - } - - /** - * Some source files lost: delete remaining source files, encluding: tsfile, resource file, mods - * file and compaction mods file. - */ - private boolean handleWithoutAllSourceFilesExist(List<TsFileIdentifier> sourceFileIdentifiers) { - // some source files have been deleted, while target file must exist. - boolean handleSuccess = true; - List<TsFileResource> remainSourceTsFileResources = new ArrayList<>(); - for (TsFileIdentifier sourceFileIdentifier : sourceFileIdentifiers) { - File sourceFile = sourceFileIdentifier.getFileFromDataDirs(); - if (sourceFile != null) { - TsFileResource resource = new TsFileResource(sourceFile); - resource.setStatus(TsFileResourceStatus.CLOSED); - remainSourceTsFileResources.add(resource); - } else { - // if source file does not exist, its resource file may still exist, so delete it. - File resourceFile = - getFileFromDataDirs( - sourceFileIdentifier.getFilePath() + TsFileResource.RESOURCE_SUFFIX); - if (resourceFile != null && !resourceFile.delete()) { - LOGGER.error( - "{} [Compaction][Recover] fail to delete target file {}, this may cause data incorrectness", - fullStorageGroupName, - resourceFile); - handleSuccess = false; - } - } - // delete .compaction.mods file and .mods file of all source files - File compactionModFile = - getFileFromDataDirs( - sourceFileIdentifier.getFilePath() + ModificationFile.COMPACTION_FILE_SUFFIX); - File modFile = - getFileFromDataDirs(sourceFileIdentifier.getFilePath() + ModificationFile.FILE_SUFFIX); - if (compactionModFile != null && !compactionModFile.delete()) { - LOGGER.error( - "{} [Compaction][Recover] fail to delete target file {}, this may cause data incorrectness", - fullStorageGroupName, - compactionModFile); - handleSuccess = false; - } - if (modFile != null && !modFile.delete()) { - LOGGER.error( - "{} [Compaction][Recover] fail to delete target file {}, this may cause data incorrectness", - fullStorageGroupName, - modFile); - handleSuccess = false; - } - } - // delete remaining source files - if (!InnerSpaceCompactionUtils.deleteTsFilesInDisk( - remainSourceTsFileResources, fullStorageGroupName)) { - LOGGER.error( - "{} [Compaction][Recover] fail to delete remaining source files.", fullStorageGroupName); - handleSuccess = false; - } - return handleSuccess; - } - - /** Delete tmp target file and compaction mods file. */ - private boolean handleWithAllSourceFilesExistFromOld( - List<TsFileIdentifier> targetFileIdentifiers) { - // delete tmp target file - for (TsFileIdentifier targetFileIdentifier : targetFileIdentifiers) { - // xxx.tsfile.merge - File tmpTargetFile = targetFileIdentifier.getFileFromDataDirs(); - if (tmpTargetFile != null) { - tmpTargetFile.delete(); - } - } - File compactionModsFileFromOld = - new File( - tsFileManager.getStorageGroupDir() - + File.separator - + IoTDBConstant.COMPACTION_MODIFICATION_FILE_NAME_FROM_OLD); - if (compactionModsFileFromOld.exists() && !compactionModsFileFromOld.delete()) { - LOGGER.error( - "{} [Compaction][Recover] fail to delete target file {}, this may cause data incorrectness", - fullStorageGroupName, - compactionModsFileFromOld); - return false; - } - return true; - } - - /** - * 1. If target file does not exist, then move .merge file to target file <br> - * 2. If target resource file does not exist, then serialize it. <br> - * 3. Append merging modification to target mods file and delete merging mods file. <br> - * 4. Delete source files and .merge file. <br> - */ - private boolean handleWithoutAllSourceFilesExistFromOld( - List<TsFileIdentifier> targetFileIdentifiers, List<TsFileIdentifier> sourceFileIdentifiers) { - try { - File compactionModsFileFromOld = - new File( - tsFileManager.getStorageGroupDir() - + File.separator - + IoTDBConstant.COMPACTION_MODIFICATION_FILE_NAME_FROM_OLD); - List<TsFileResource> targetFileResources = new ArrayList<>(); - for (int i = 0; i < sourceFileIdentifiers.size(); i++) { - TsFileIdentifier sourceFileIdentifier = sourceFileIdentifiers.get(i); - if (sourceFileIdentifier.isSequence()) { - File tmpTargetFile = targetFileIdentifiers.get(i).getFileFromDataDirs(); - File targetFile = null; - - // move tmp target file to target file if not exist - if (tmpTargetFile != null) { - // move tmp target file to target file - String sourceFilePath = - tmpTargetFile - .getPath() - .replace( - TsFileConstant.TSFILE_SUFFIX - + IoTDBConstant.CROSS_COMPACTION_TMP_FILE_SUFFIX_FROM_OLD, - TsFileConstant.TSFILE_SUFFIX); - targetFile = TsFileNameGenerator.increaseCrossCompactionCnt(new File(sourceFilePath)); - FSFactoryProducer.getFSFactory().moveFile(tmpTargetFile, targetFile); - } else { - // target file must exist - File file = - TsFileNameGenerator.increaseCrossCompactionCnt( - new File( - targetFileIdentifiers - .get(i) - .getFilePath() - .replace( - TsFileConstant.TSFILE_SUFFIX - + IoTDBConstant.CROSS_COMPACTION_TMP_FILE_SUFFIX_FROM_OLD, - TsFileConstant.TSFILE_SUFFIX))); - - targetFile = getFileFromDataDirs(file.getPath()); - } - if (targetFile == null) { - LOGGER.error( - "{} [Compaction][Recover] target file of source seq file {} does not exist (<0.13).", - fullStorageGroupName, - sourceFileIdentifier.getFilePath()); - return false; - } - - // serialize target resource file if not exist - TsFileResource targetResource = new TsFileResource(targetFile); - if (!targetResource.resourceFileExists()) { - try (TsFileSequenceReader reader = - new TsFileSequenceReader(targetFile.getAbsolutePath())) { - FileLoaderUtils.updateTsFileResource(reader, targetResource); - } - targetResource.serialize(); - } - - targetFileResources.add(targetResource); - - // append compaction modifications to target mods file and delete compaction mods file - if (compactionModsFileFromOld.exists()) { - ModificationFile compactionModsFile = - new ModificationFile(compactionModsFileFromOld.getPath()); - appendCompactionModificationsFromOld(targetResource, compactionModsFile); - } - - // delete tmp target file - if (tmpTargetFile != null) { - tmpTargetFile.delete(); - } - } - - // delete source tsfile - File sourceFile = sourceFileIdentifier.getFileFromDataDirs(); - if (sourceFile != null) { - sourceFile.delete(); - } - - // delete source resource file - sourceFile = - getFileFromDataDirs( - sourceFileIdentifier.getFilePath() + TsFileResource.RESOURCE_SUFFIX); - if (sourceFile != null) { - sourceFile.delete(); - } - - // delete source mods file - sourceFile = - getFileFromDataDirs(sourceFileIdentifier.getFilePath() + ModificationFile.FILE_SUFFIX); - if (sourceFile != null) { - sourceFile.delete(); - } - } - - // delete compaction mods file - if (compactionModsFileFromOld.exists() && !compactionModsFileFromOld.delete()) { - LOGGER.error( - "{} [Compaction][Recover] fail to delete target file {}, this may cause data incorrectness", - fullStorageGroupName, - compactionModsFileFromOld); - return false; - } - } catch (Throwable e) { - LOGGER.error( - "{} [Compaction][Recover] fail to handle with some source files lost from old version.", - fullStorageGroupName, - e); - return false; - } - - return true; - } - - public static void appendCompactionModificationsFromOld( - TsFileResource resource, ModificationFile compactionModsFile) throws IOException { - - if (compactionModsFile != null) { - for (Modification modification : compactionModsFile.getModifications()) { - // we have to set modification offset to MAX_VALUE, as the offset of source chunk may - // change after compaction - modification.setFileOffset(Long.MAX_VALUE); - resource.getModFile().write(modification); - } - resource.getModFile().close(); - } - } - - /** - * This method find the File object of given filePath by searching it in every data directory. If - * the file is not found, it will return null. - */ - private File getFileFromDataDirs(String filePath) { - String[] dataDirs = IoTDBDescriptor.getInstance().getConfig().getDataDirs(); - for (String dataDir : dataDirs) { - File f = new File(dataDir, filePath); - if (f.exists()) { - return f; - } - } - return null; - } - - @Override - public boolean equalsOtherTask(AbstractCompactionTask other) { - if (other instanceof RewriteCrossCompactionRecoverTask) { - return compactionLogFile.equals( - ((RewriteCrossCompactionRecoverTask) other).compactionLogFile); - } - return false; - } - - @Override - public boolean checkValidAndSetMerging() { - return compactionLogFile.exists(); - } - - /** Return whether compaction log file is from previous version (<0.13). */ - private boolean isOldLog() { - return compactionLogFile.getName().equals(CompactionLogger.CROSS_COMPACTION_LOG_NAME_FROM_OLD); - } -} diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/rewrite/task/RewriteCrossSpaceCompactionTask.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/rewrite/task/RewriteCrossSpaceCompactionTask.java index dc24ed005a..e526893381 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/rewrite/task/RewriteCrossSpaceCompactionTask.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/rewrite/task/RewriteCrossSpaceCompactionTask.java @@ -32,6 +32,7 @@ import org.apache.iotdb.db.engine.storagegroup.TsFileResourceStatus; import org.apache.iotdb.db.exception.StorageEngineException; import org.apache.iotdb.db.exception.metadata.MetadataException; import org.apache.iotdb.db.query.control.FileReaderManager; +import org.apache.iotdb.db.rescon.SystemInfo; import org.apache.iotdb.tsfile.exception.write.WriteProcessException; import org.apache.commons.io.FileUtils; @@ -55,6 +56,7 @@ public class RewriteCrossSpaceCompactionTask extends AbstractCrossSpaceCompactio protected TsFileResourceList seqTsFileResourceList; protected TsFileResourceList unseqTsFileResourceList; private File logFile; + private long memoryCost = -1; private List<TsFileResource> targetTsfileResourceList; private List<TsFileResource> holdReadLockList = new ArrayList<>(); @@ -67,7 +69,8 @@ public class RewriteCrossSpaceCompactionTask extends AbstractCrossSpaceCompactio TsFileManager tsFileManager, List<TsFileResource> selectedSeqTsFileResourceList, List<TsFileResource> selectedUnSeqTsFileResourceList, - AtomicInteger currentTaskNum) { + AtomicInteger currentTaskNum, + long memoryCost) { super( logicalStorageGroupName + "-" + virtualStorageGroupName, timePartitionId, @@ -79,10 +82,17 @@ public class RewriteCrossSpaceCompactionTask extends AbstractCrossSpaceCompactio this.selectedUnSeqTsFileResourceList = selectedUnSeqTsFileResourceList; this.seqTsFileResourceList = tsFileManager.getSequenceListByTimePartition(timePartition); this.unseqTsFileResourceList = tsFileManager.getUnsequenceListByTimePartition(timePartition); + this.memoryCost = memoryCost; } @Override protected void doCompaction() throws Exception { + try { + SystemInfo.getInstance().addCompactionMemoryCost(memoryCost); + } catch (InterruptedException e) { + logger.error("Thread get interrupted when allocating memory for compaction", e); + return; + } try { executeCompaction(); } catch (Throwable throwable) { @@ -100,6 +110,7 @@ public class RewriteCrossSpaceCompactionTask extends AbstractCrossSpaceCompactio true); throw throwable; } finally { + SystemInfo.getInstance().resetCompactionMemoryCost(memoryCost); releaseAllLock(); } } diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/utils/InnerSpaceCompactionUtils.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/utils/InnerSpaceCompactionUtils.java index 1f48abf35d..e7590e5589 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/utils/InnerSpaceCompactionUtils.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/utils/InnerSpaceCompactionUtils.java @@ -27,7 +27,6 @@ import org.apache.iotdb.db.engine.compaction.cross.rewrite.selector.ICrossSpaceM import org.apache.iotdb.db.engine.compaction.cross.rewrite.selector.RewriteCompactionFileSelector; import org.apache.iotdb.db.engine.modification.Modification; import org.apache.iotdb.db.engine.modification.ModificationFile; -import org.apache.iotdb.db.engine.storagegroup.TsFileManager; import org.apache.iotdb.db.engine.storagegroup.TsFileResource; import org.apache.iotdb.db.engine.storagegroup.TsFileResourceStatus; import org.apache.iotdb.db.exception.metadata.MetadataException; @@ -43,7 +42,6 @@ import org.apache.iotdb.tsfile.utils.Pair; import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema; import org.apache.iotdb.tsfile.write.writer.TsFileIOWriter; -import org.apache.commons.io.FileUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -51,7 +49,6 @@ import java.io.File; import java.io.IOException; import java.util.ArrayList; import java.util.Collection; -import java.util.Comparator; import java.util.LinkedList; import java.util.List; @@ -171,34 +168,6 @@ public class InnerSpaceCompactionUtils { } } - /** - * This method is called to recover modifications while an exception occurs during compaction. It - * append new modifications of each selected tsfile to its corresponding old mods file and delete - * the compaction mods file. - * - * @param selectedTsFileResources - * @throws IOException - */ - public static void appendNewModificationsToOldModsFile( - List<TsFileResource> selectedTsFileResources) throws IOException { - for (TsFileResource sourceFile : selectedTsFileResources) { - // if there are modifications to this seqFile during compaction - if (sourceFile.getCompactionModFile().exists()) { - ModificationFile compactionModificationFile = - ModificationFile.getCompactionMods(sourceFile); - Collection<Modification> newModification = compactionModificationFile.getModifications(); - compactionModificationFile.close(); - // write the new modifications to its old modification file - try (ModificationFile oldModificationFile = sourceFile.getModFile()) { - for (Modification modification : newModification) { - oldModificationFile.write(modification); - } - } - FileUtils.delete(new File(ModificationFile.getCompactionMods(sourceFile).getFilePath())); - } - } - } - /** * Collect all the compaction modification files of source files, and combines them as the * modification file of target file. @@ -248,13 +217,6 @@ public class InnerSpaceCompactionUtils { } } - public static class TsFileNameComparator implements Comparator<TsFileSequenceReader> { - - @Override - public int compare(TsFileSequenceReader o1, TsFileSequenceReader o2) { - return TsFileManager.compareFileName(new File(o1.getFileName()), new File(o2.getFileName())); - } - } /** * Update the targetResource. Move xxx.target to xxx.tsfile and serialize xxx.tsfile.resource . * diff --git a/server/src/main/java/org/apache/iotdb/db/rescon/PrimitiveArrayManager.java b/server/src/main/java/org/apache/iotdb/db/rescon/PrimitiveArrayManager.java index 0f5b667c49..b156483116 100644 --- a/server/src/main/java/org/apache/iotdb/db/rescon/PrimitiveArrayManager.java +++ b/server/src/main/java/org/apache/iotdb/db/rescon/PrimitiveArrayManager.java @@ -48,7 +48,7 @@ public class PrimitiveArrayManager { /** threshold total size of arrays for all data types */ private static final double POOLED_ARRAYS_MEMORY_THRESHOLD = - CONFIG.getAllocateMemoryForWrite() + CONFIG.getAllocateMemoryForStorageEngine() * CONFIG.getBufferedArraysMemoryProportion() / AMPLIFICATION_FACTOR; diff --git a/server/src/main/java/org/apache/iotdb/db/rescon/SystemInfo.java b/server/src/main/java/org/apache/iotdb/db/rescon/SystemInfo.java index 334bb965ca..8f3fa6c48e 100644 --- a/server/src/main/java/org/apache/iotdb/db/rescon/SystemInfo.java +++ b/server/src/main/java/org/apache/iotdb/db/rescon/SystemInfo.java @@ -34,6 +34,7 @@ import java.util.HashMap; import java.util.Map; import java.util.PriorityQueue; import java.util.concurrent.ExecutorService; +import java.util.concurrent.atomic.AtomicLong; public class SystemInfo { @@ -43,10 +44,14 @@ public class SystemInfo { private long totalStorageGroupMemCost = 0L; private volatile boolean rejected = false; - private static long memorySizeForWrite = config.getAllocateMemoryForWrite(); + private static long memorySizeForWrite = + (long) (config.getAllocateMemoryForStorageEngine() * config.getWriteProportion()); + private static long memorySizeForCompaction = + (long) (config.getAllocateMemoryForStorageEngine() * config.getCompactionProportion()); private Map<StorageGroupInfo, Long> reportedStorageGroupMemCostMap = new HashMap<>(); private long flushingMemTablesCost = 0L; + private AtomicLong compactionMemoryCost = new AtomicLong(0L); private ExecutorService flushTaskSubmitThreadPool = IoTDBThreadPoolFactory.newSingleThreadExecutor("FlushTask-Submit-Pool"); @@ -269,4 +274,25 @@ public class SystemInfo { public int flushingMemTableNum() { return FlushManager.getInstance().getNumberOfWorkingTasks(); } + + public void addCompactionMemoryCost(long memoryCost) throws InterruptedException { + long originSize = this.compactionMemoryCost.get(); + while (originSize + memoryCost > memorySizeForCompaction + || !compactionMemoryCost.compareAndSet(originSize, originSize + memoryCost)) { + Thread.sleep(100); + originSize = this.compactionMemoryCost.get(); + } + } + + public synchronized void resetCompactionMemoryCost(long compactionMemoryCost) { + this.compactionMemoryCost.addAndGet(-compactionMemoryCost); + } + + public long getMemorySizeForCompaction() { + return memorySizeForCompaction; + } + + public void setMemorySizeForCompaction(long size) { + memorySizeForCompaction = size; + } } diff --git a/server/src/test/java/org/apache/iotdb/db/engine/compaction/CompactionSchedulerTest.java b/server/src/test/java/org/apache/iotdb/db/engine/compaction/CompactionSchedulerTest.java index 4aff8c0d9e..ddc3128376 100644 --- a/server/src/test/java/org/apache/iotdb/db/engine/compaction/CompactionSchedulerTest.java +++ b/server/src/test/java/org/apache/iotdb/db/engine/compaction/CompactionSchedulerTest.java @@ -32,6 +32,7 @@ import org.apache.iotdb.db.engine.storagegroup.TsFileResource; import org.apache.iotdb.db.exception.StorageEngineException; import org.apache.iotdb.db.exception.metadata.MetadataException; import org.apache.iotdb.db.metadata.path.PartialPath; +import org.apache.iotdb.db.rescon.SystemInfo; import org.apache.iotdb.db.service.IoTDB; import org.apache.iotdb.db.utils.EnvironmentUtils; import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor; @@ -285,9 +286,14 @@ public class CompactionSchedulerTest { int prevMaxCompactionCandidateFileNum = IoTDBDescriptor.getInstance().getConfig().getMaxInnerCompactionCandidateFileNum(); IoTDBDescriptor.getInstance().getConfig().setMaxInnerCompactionCandidateFileNum(100); - IoTDBDescriptor.getInstance() - .getConfig() - .setCrossCompactionMemoryBudget(2 * 1024 * 1024L * 1024L); + long originSize = SystemInfo.getInstance().getMemorySizeForCompaction(); + SystemInfo.getInstance() + .setMemorySizeForCompaction( + 2 + * 1024L + * 1024L + * 1024L + * IoTDBDescriptor.getInstance().getConfig().getConcurrentCompactionThread()); String sgName = COMPACTION_TEST_SG + "test2"; try { IoTDB.metaManager.setStorageGroup(new PartialPath(sgName)); @@ -388,6 +394,7 @@ public class CompactionSchedulerTest { IoTDBDescriptor.getInstance() .getConfig() .setMaxInnerCompactionCandidateFileNum(prevMaxCompactionCandidateFileNum); + SystemInfo.getInstance().setMemorySizeForCompaction(originSize); } } diff --git a/server/src/test/java/org/apache/iotdb/db/engine/compaction/CompactionTaskManagerTest.java b/server/src/test/java/org/apache/iotdb/db/engine/compaction/CompactionTaskManagerTest.java index fa6c4386c2..32bb1e97a0 100644 --- a/server/src/test/java/org/apache/iotdb/db/engine/compaction/CompactionTaskManagerTest.java +++ b/server/src/test/java/org/apache/iotdb/db/engine/compaction/CompactionTaskManagerTest.java @@ -278,7 +278,8 @@ public class CompactionTaskManagerTest extends InnerCompactionTest { tsFileManager, seqResources, unseqResources, - new AtomicInteger(0)); + new AtomicInteger(0), + 0); for (TsFileResource resource : seqResources) { Assert.assertFalse(resource.isCompactionCandidate()); diff --git a/server/src/test/java/org/apache/iotdb/db/engine/compaction/cross/CrossSpaceCompactionTest.java b/server/src/test/java/org/apache/iotdb/db/engine/compaction/cross/CrossSpaceCompactionTest.java index 852b66aa62..eadfd6b71e 100644 --- a/server/src/test/java/org/apache/iotdb/db/engine/compaction/cross/CrossSpaceCompactionTest.java +++ b/server/src/test/java/org/apache/iotdb/db/engine/compaction/cross/CrossSpaceCompactionTest.java @@ -438,7 +438,8 @@ public class CrossSpaceCompactionTest { "0", "target\\data\\sequence\\test\\root.compactionTest\\0\\0\\"), mergeResource.getSeqFiles(), - mergeResource.getUnseqFiles()); + mergeResource.getUnseqFiles(), + 0); compactionTask.call(); List<TsFileResource> targetTsfileResourceList = new ArrayList<>(); for (TsFileResource seqResource : seqResources) { @@ -741,7 +742,8 @@ public class CrossSpaceCompactionTest { "0", "target\\data\\sequence\\test\\root.compactionTest\\0\\0\\"), mergeResource.getSeqFiles(), - mergeResource.getUnseqFiles()); + mergeResource.getUnseqFiles(), + 0); compactionTask.call(); List<TsFileResource> targetTsfileResourceList = new ArrayList<>(); for (TsFileResource seqResource : seqResources.subList(1, 4)) { @@ -1043,7 +1045,8 @@ public class CrossSpaceCompactionTest { "0", "target\\data\\sequence\\test\\root.compactionTest\\0\\0\\"), mergeResource.getSeqFiles(), - mergeResource.getUnseqFiles()); + mergeResource.getUnseqFiles(), + 0); compactionTask.call(); List<TsFileResource> targetTsfileResourceList = new ArrayList<>(); for (TsFileResource seqResource : seqResources.subList(1, 4)) { diff --git a/server/src/test/java/org/apache/iotdb/db/engine/compaction/cross/CrossSpaceCompactionValidationTest.java b/server/src/test/java/org/apache/iotdb/db/engine/compaction/cross/CrossSpaceCompactionValidationTest.java index 83ac4b538b..95fcd6d0b3 100644 --- a/server/src/test/java/org/apache/iotdb/db/engine/compaction/cross/CrossSpaceCompactionValidationTest.java +++ b/server/src/test/java/org/apache/iotdb/db/engine/compaction/cross/CrossSpaceCompactionValidationTest.java @@ -97,7 +97,14 @@ public class CrossSpaceCompactionValidationTest extends AbstractCompactionTest { Assert.assertEquals(result[1].get(1), unseqResources.get(1)); new RewriteCrossSpaceCompactionTask( - "0", COMPACTION_TEST_SG, 0, tsFileManager, result[0], result[1], new AtomicInteger(0)) + "0", + COMPACTION_TEST_SG, + 0, + tsFileManager, + result[0], + result[1], + new AtomicInteger(0), + 0) .call(); validateSeqFiles(); @@ -134,7 +141,14 @@ public class CrossSpaceCompactionValidationTest extends AbstractCompactionTest { Assert.assertEquals(result[1].get(1), unseqResources.get(1)); new RewriteCrossSpaceCompactionTask( - "0", COMPACTION_TEST_SG, 0, tsFileManager, result[0], result[1], new AtomicInteger(0)) + "0", + COMPACTION_TEST_SG, + 0, + tsFileManager, + result[0], + result[1], + new AtomicInteger(0), + 0) .call(); validateSeqFiles(); @@ -171,7 +185,14 @@ public class CrossSpaceCompactionValidationTest extends AbstractCompactionTest { Assert.assertEquals(result[1].get(1), unseqResources.get(1)); new RewriteCrossSpaceCompactionTask( - "0", COMPACTION_TEST_SG, 0, tsFileManager, result[0], result[1], new AtomicInteger(0)) + "0", + COMPACTION_TEST_SG, + 0, + tsFileManager, + result[0], + result[1], + new AtomicInteger(0), + 0) .call(); validateSeqFiles(); @@ -216,7 +237,14 @@ public class CrossSpaceCompactionValidationTest extends AbstractCompactionTest { Assert.assertEquals(result[1].get(3), unseqResources.get(3)); new RewriteCrossSpaceCompactionTask( - "0", COMPACTION_TEST_SG, 0, tsFileManager, result[0], result[1], new AtomicInteger(0)) + "0", + COMPACTION_TEST_SG, + 0, + tsFileManager, + result[0], + result[1], + new AtomicInteger(0), + 0) .call(); validateSeqFiles(); @@ -258,7 +286,14 @@ public class CrossSpaceCompactionValidationTest extends AbstractCompactionTest { Assert.assertEquals(result[1].get(1), unseqResources.get(1)); new RewriteCrossSpaceCompactionTask( - "0", COMPACTION_TEST_SG, 0, tsFileManager, result[0], result[1], new AtomicInteger(0)) + "0", + COMPACTION_TEST_SG, + 0, + tsFileManager, + result[0], + result[1], + new AtomicInteger(0), + 0) .call(); validateSeqFiles(); @@ -298,7 +333,14 @@ public class CrossSpaceCompactionValidationTest extends AbstractCompactionTest { Assert.assertEquals(result[1].get(2), unseqResources.get(2)); new RewriteCrossSpaceCompactionTask( - "0", COMPACTION_TEST_SG, 0, tsFileManager, result[0], result[1], new AtomicInteger(0)) + "0", + COMPACTION_TEST_SG, + 0, + tsFileManager, + result[0], + result[1], + new AtomicInteger(0), + 0) .call(); validateSeqFiles(); @@ -341,7 +383,14 @@ public class CrossSpaceCompactionValidationTest extends AbstractCompactionTest { Assert.assertEquals(result[1].get(3), unseqResources.get(3)); new RewriteCrossSpaceCompactionTask( - "0", COMPACTION_TEST_SG, 0, tsFileManager, result[0], result[1], new AtomicInteger(0)) + "0", + COMPACTION_TEST_SG, + 0, + tsFileManager, + result[0], + result[1], + new AtomicInteger(0), + 0) .call(); validateSeqFiles(); @@ -383,7 +432,14 @@ public class CrossSpaceCompactionValidationTest extends AbstractCompactionTest { Assert.assertEquals(result[1].get(3), unseqResources.get(3)); new RewriteCrossSpaceCompactionTask( - "0", COMPACTION_TEST_SG, 0, tsFileManager, result[0], result[1], new AtomicInteger(0)) + "0", + COMPACTION_TEST_SG, + 0, + tsFileManager, + result[0], + result[1], + new AtomicInteger(0), + 0) .call(); validateSeqFiles(); @@ -424,7 +480,14 @@ public class CrossSpaceCompactionValidationTest extends AbstractCompactionTest { Assert.assertEquals(result[1].get(1), unseqResources.get(1)); new RewriteCrossSpaceCompactionTask( - "0", COMPACTION_TEST_SG, 0, tsFileManager, result[0], result[1], new AtomicInteger(0)) + "0", + COMPACTION_TEST_SG, + 0, + tsFileManager, + result[0], + result[1], + new AtomicInteger(0), + 0) .call(); validateSeqFiles(); @@ -466,7 +529,14 @@ public class CrossSpaceCompactionValidationTest extends AbstractCompactionTest { Assert.assertEquals(result[1].get(1), unseqResources.get(1)); new RewriteCrossSpaceCompactionTask( - "0", COMPACTION_TEST_SG, 0, tsFileManager, result[0], result[1], new AtomicInteger(0)) + "0", + COMPACTION_TEST_SG, + 0, + tsFileManager, + result[0], + result[1], + new AtomicInteger(0), + 0) .call(); validateSeqFiles(); @@ -508,7 +578,14 @@ public class CrossSpaceCompactionValidationTest extends AbstractCompactionTest { Assert.assertEquals(result[1].get(1), unseqResources.get(1)); new RewriteCrossSpaceCompactionTask( - "0", COMPACTION_TEST_SG, 0, tsFileManager, result[0], result[1], new AtomicInteger(0)) + "0", + COMPACTION_TEST_SG, + 0, + tsFileManager, + result[0], + result[1], + new AtomicInteger(0), + 0) .call(); validateSeqFiles(); @@ -550,7 +627,14 @@ public class CrossSpaceCompactionValidationTest extends AbstractCompactionTest { Assert.assertEquals(result[1].get(1), unseqResources.get(1)); new RewriteCrossSpaceCompactionTask( - "0", COMPACTION_TEST_SG, 0, tsFileManager, result[0], result[1], new AtomicInteger(0)) + "0", + COMPACTION_TEST_SG, + 0, + tsFileManager, + result[0], + result[1], + new AtomicInteger(0), + 0) .call(); validateSeqFiles(); @@ -593,7 +677,14 @@ public class CrossSpaceCompactionValidationTest extends AbstractCompactionTest { Assert.assertEquals(result[1].get(1), unseqResources.get(1)); new RewriteCrossSpaceCompactionTask( - "0", COMPACTION_TEST_SG, 0, tsFileManager, result[0], result[1], new AtomicInteger(0)) + "0", + COMPACTION_TEST_SG, + 0, + tsFileManager, + result[0], + result[1], + new AtomicInteger(0), + 0) .call(); validateSeqFiles(); @@ -637,7 +728,14 @@ public class CrossSpaceCompactionValidationTest extends AbstractCompactionTest { Assert.assertEquals(result[1].get(1), unseqResources.get(1)); new RewriteCrossSpaceCompactionTask( - "0", COMPACTION_TEST_SG, 0, tsFileManager, result[0], result[1], new AtomicInteger(0)) + "0", + COMPACTION_TEST_SG, + 0, + tsFileManager, + result[0], + result[1], + new AtomicInteger(0), + 0) .call(); validateSeqFiles(); @@ -681,7 +779,14 @@ public class CrossSpaceCompactionValidationTest extends AbstractCompactionTest { Assert.assertEquals(result[1].get(1), unseqResources.get(1)); new RewriteCrossSpaceCompactionTask( - "0", COMPACTION_TEST_SG, 0, tsFileManager, result[0], result[1], new AtomicInteger(0)) + "0", + COMPACTION_TEST_SG, + 0, + tsFileManager, + result[0], + result[1], + new AtomicInteger(0), + 0) .call(); validateSeqFiles(); @@ -726,7 +831,14 @@ public class CrossSpaceCompactionValidationTest extends AbstractCompactionTest { Assert.assertEquals(result[1].get(1), unseqResources.get(1)); new RewriteCrossSpaceCompactionTask( - "0", COMPACTION_TEST_SG, 0, tsFileManager, result[0], result[1], new AtomicInteger(0)) + "0", + COMPACTION_TEST_SG, + 0, + tsFileManager, + result[0], + result[1], + new AtomicInteger(0), + 0) .call(); validateSeqFiles(); @@ -772,7 +884,14 @@ public class CrossSpaceCompactionValidationTest extends AbstractCompactionTest { Assert.assertEquals(result[1].get(1), unseqResources.get(1)); new RewriteCrossSpaceCompactionTask( - "0", COMPACTION_TEST_SG, 0, tsFileManager, result[0], result[1], new AtomicInteger(0)) + "0", + COMPACTION_TEST_SG, + 0, + tsFileManager, + result[0], + result[1], + new AtomicInteger(0), + 0) .call(); validateSeqFiles(); @@ -818,7 +937,14 @@ public class CrossSpaceCompactionValidationTest extends AbstractCompactionTest { Assert.assertEquals(result[1].get(0), unseqResources.get(0)); new RewriteCrossSpaceCompactionTask( - "0", COMPACTION_TEST_SG, 0, tsFileManager, result[0], result[1], new AtomicInteger(0)) + "0", + COMPACTION_TEST_SG, + 0, + tsFileManager, + result[0], + result[1], + new AtomicInteger(0), + 0) .call(); validateSeqFiles(); @@ -864,7 +990,14 @@ public class CrossSpaceCompactionValidationTest extends AbstractCompactionTest { Assert.assertEquals(result[1].get(0), unseqResources.get(0)); new RewriteCrossSpaceCompactionTask( - "0", COMPACTION_TEST_SG, 0, tsFileManager, result[0], result[1], new AtomicInteger(0)) + "0", + COMPACTION_TEST_SG, + 0, + tsFileManager, + result[0], + result[1], + new AtomicInteger(0), + 0) .call(); validateSeqFiles(); @@ -910,7 +1043,14 @@ public class CrossSpaceCompactionValidationTest extends AbstractCompactionTest { Assert.assertEquals(result[1].get(0), unseqResources.get(0)); new RewriteCrossSpaceCompactionTask( - "0", COMPACTION_TEST_SG, 0, tsFileManager, result[0], result[1], new AtomicInteger(0)) + "0", + COMPACTION_TEST_SG, + 0, + tsFileManager, + result[0], + result[1], + new AtomicInteger(0), + 0) .call(); validateSeqFiles(); @@ -955,7 +1095,14 @@ public class CrossSpaceCompactionValidationTest extends AbstractCompactionTest { Assert.assertEquals(result[1].get(0), unseqResources.get(0)); new RewriteCrossSpaceCompactionTask( - "0", COMPACTION_TEST_SG, 0, tsFileManager, result[0], result[1], new AtomicInteger(0)) + "0", + COMPACTION_TEST_SG, + 0, + tsFileManager, + result[0], + result[1], + new AtomicInteger(0), + 0) .call(); validateSeqFiles(); @@ -996,7 +1143,14 @@ public class CrossSpaceCompactionValidationTest extends AbstractCompactionTest { Assert.assertEquals(result[1].get(1), unseqResources.get(1)); new RewriteCrossSpaceCompactionTask( - "0", COMPACTION_TEST_SG, 0, tsFileManager, result[0], result[1], new AtomicInteger(0)) + "0", + COMPACTION_TEST_SG, + 0, + tsFileManager, + result[0], + result[1], + new AtomicInteger(0), + 0) .call(); validateSeqFiles(); @@ -1038,7 +1192,14 @@ public class CrossSpaceCompactionValidationTest extends AbstractCompactionTest { Assert.assertEquals(result[1].get(1), unseqResources.get(1)); new RewriteCrossSpaceCompactionTask( - "0", COMPACTION_TEST_SG, 0, tsFileManager, result[0], result[1], new AtomicInteger(0)) + "0", + COMPACTION_TEST_SG, + 0, + tsFileManager, + result[0], + result[1], + new AtomicInteger(0), + 0) .call(); validateSeqFiles(); @@ -1080,7 +1241,14 @@ public class CrossSpaceCompactionValidationTest extends AbstractCompactionTest { Assert.assertEquals(result[1].get(1), unseqResources.get(1)); new RewriteCrossSpaceCompactionTask( - "0", COMPACTION_TEST_SG, 0, tsFileManager, result[0], result[1], new AtomicInteger(0)) + "0", + COMPACTION_TEST_SG, + 0, + tsFileManager, + result[0], + result[1], + new AtomicInteger(0), + 0) .call(); validateSeqFiles(); @@ -1122,7 +1290,14 @@ public class CrossSpaceCompactionValidationTest extends AbstractCompactionTest { Assert.assertEquals(result[1].get(1), unseqResources.get(1)); new RewriteCrossSpaceCompactionTask( - "0", COMPACTION_TEST_SG, 0, tsFileManager, result[0], result[1], new AtomicInteger(0)) + "0", + COMPACTION_TEST_SG, + 0, + tsFileManager, + result[0], + result[1], + new AtomicInteger(0), + 0) .call(); validateSeqFiles(); @@ -1165,7 +1340,14 @@ public class CrossSpaceCompactionValidationTest extends AbstractCompactionTest { Assert.assertEquals(result[1].get(1), unseqResources.get(1)); new RewriteCrossSpaceCompactionTask( - "0", COMPACTION_TEST_SG, 0, tsFileManager, result[0], result[1], new AtomicInteger(0)) + "0", + COMPACTION_TEST_SG, + 0, + tsFileManager, + result[0], + result[1], + new AtomicInteger(0), + 0) .call(); validateSeqFiles(); @@ -1209,7 +1391,14 @@ public class CrossSpaceCompactionValidationTest extends AbstractCompactionTest { Assert.assertEquals(result[1].get(1), unseqResources.get(1)); new RewriteCrossSpaceCompactionTask( - "0", COMPACTION_TEST_SG, 0, tsFileManager, result[0], result[1], new AtomicInteger(0)) + "0", + COMPACTION_TEST_SG, + 0, + tsFileManager, + result[0], + result[1], + new AtomicInteger(0), + 0) .call(); validateSeqFiles(); @@ -1253,7 +1442,14 @@ public class CrossSpaceCompactionValidationTest extends AbstractCompactionTest { Assert.assertEquals(result[1].get(1), unseqResources.get(1)); new RewriteCrossSpaceCompactionTask( - "0", COMPACTION_TEST_SG, 0, tsFileManager, result[0], result[1], new AtomicInteger(0)) + "0", + COMPACTION_TEST_SG, + 0, + tsFileManager, + result[0], + result[1], + new AtomicInteger(0), + 0) .call(); validateSeqFiles(); @@ -1298,7 +1494,14 @@ public class CrossSpaceCompactionValidationTest extends AbstractCompactionTest { Assert.assertEquals(result[1].get(1), unseqResources.get(1)); new RewriteCrossSpaceCompactionTask( - "0", COMPACTION_TEST_SG, 0, tsFileManager, result[0], result[1], new AtomicInteger(0)) + "0", + COMPACTION_TEST_SG, + 0, + tsFileManager, + result[0], + result[1], + new AtomicInteger(0), + 0) .call(); validateSeqFiles(); @@ -1344,7 +1547,14 @@ public class CrossSpaceCompactionValidationTest extends AbstractCompactionTest { Assert.assertEquals(result[1].get(1), unseqResources.get(1)); new RewriteCrossSpaceCompactionTask( - "0", COMPACTION_TEST_SG, 0, tsFileManager, result[0], result[1], new AtomicInteger(0)) + "0", + COMPACTION_TEST_SG, + 0, + tsFileManager, + result[0], + result[1], + new AtomicInteger(0), + 0) .call(); validateSeqFiles(); @@ -1390,7 +1600,14 @@ public class CrossSpaceCompactionValidationTest extends AbstractCompactionTest { Assert.assertEquals(result[1].get(0), unseqResources.get(0)); new RewriteCrossSpaceCompactionTask( - "0", COMPACTION_TEST_SG, 0, tsFileManager, result[0], result[1], new AtomicInteger(0)) + "0", + COMPACTION_TEST_SG, + 0, + tsFileManager, + result[0], + result[1], + new AtomicInteger(0), + 0) .call(); validateSeqFiles(); @@ -1436,7 +1653,14 @@ public class CrossSpaceCompactionValidationTest extends AbstractCompactionTest { Assert.assertEquals(result[1].get(0), unseqResources.get(0)); new RewriteCrossSpaceCompactionTask( - "0", COMPACTION_TEST_SG, 0, tsFileManager, result[0], result[1], new AtomicInteger(0)) + "0", + COMPACTION_TEST_SG, + 0, + tsFileManager, + result[0], + result[1], + new AtomicInteger(0), + 0) .call(); validateSeqFiles(); @@ -1482,7 +1706,14 @@ public class CrossSpaceCompactionValidationTest extends AbstractCompactionTest { Assert.assertEquals(result[1].get(0), unseqResources.get(0)); new RewriteCrossSpaceCompactionTask( - "0", COMPACTION_TEST_SG, 0, tsFileManager, result[0], result[1], new AtomicInteger(0)) + "0", + COMPACTION_TEST_SG, + 0, + tsFileManager, + result[0], + result[1], + new AtomicInteger(0), + 0) .call(); validateSeqFiles(); @@ -1527,7 +1758,14 @@ public class CrossSpaceCompactionValidationTest extends AbstractCompactionTest { Assert.assertEquals(result[1].get(0), unseqResources.get(0)); new RewriteCrossSpaceCompactionTask( - "0", COMPACTION_TEST_SG, 0, tsFileManager, result[0], result[1], new AtomicInteger(0)) + "0", + COMPACTION_TEST_SG, + 0, + tsFileManager, + result[0], + result[1], + new AtomicInteger(0), + 0) .call(); validateSeqFiles(); @@ -1570,7 +1808,14 @@ public class CrossSpaceCompactionValidationTest extends AbstractCompactionTest { Assert.assertEquals(result[1].get(0), unseqResources.get(0)); new RewriteCrossSpaceCompactionTask( - "0", COMPACTION_TEST_SG, 0, tsFileManager, result[0], result[1], new AtomicInteger(0)) + "0", + COMPACTION_TEST_SG, + 0, + tsFileManager, + result[0], + result[1], + new AtomicInteger(0), + 0) .call(); validateSeqFiles(); @@ -1614,7 +1859,14 @@ public class CrossSpaceCompactionValidationTest extends AbstractCompactionTest { Assert.assertEquals(result[0].get(1), seqResources.get(3)); Assert.assertEquals(result[1].get(0), unseqResources.get(0)); new RewriteCrossSpaceCompactionTask( - "0", COMPACTION_TEST_SG, 0, tsFileManager, result[0], result[1], new AtomicInteger(0)) + "0", + COMPACTION_TEST_SG, + 0, + tsFileManager, + result[0], + result[1], + new AtomicInteger(0), + 0) .call(); validateSeqFiles(); @@ -1658,7 +1910,14 @@ public class CrossSpaceCompactionValidationTest extends AbstractCompactionTest { Assert.assertEquals(result[1].get(0), unseqResources.get(0)); new RewriteCrossSpaceCompactionTask( - "0", COMPACTION_TEST_SG, 0, tsFileManager, result[0], result[1], new AtomicInteger(0)) + "0", + COMPACTION_TEST_SG, + 0, + tsFileManager, + result[0], + result[1], + new AtomicInteger(0), + 0) .call(); validateSeqFiles(); @@ -1702,7 +1961,14 @@ public class CrossSpaceCompactionValidationTest extends AbstractCompactionTest { Assert.assertEquals(result[1].get(0), unseqResources.get(0)); Assert.assertEquals(result[1].get(1), unseqResources.get(1)); new RewriteCrossSpaceCompactionTask( - "0", COMPACTION_TEST_SG, 0, tsFileManager, result[0], result[1], new AtomicInteger(0)) + "0", + COMPACTION_TEST_SG, + 0, + tsFileManager, + result[0], + result[1], + new AtomicInteger(0), + 0) .call(); validateSeqFiles(); @@ -1747,7 +2013,14 @@ public class CrossSpaceCompactionValidationTest extends AbstractCompactionTest { Assert.assertEquals(result[1].get(0), unseqResources.get(0)); new RewriteCrossSpaceCompactionTask( - "0", COMPACTION_TEST_SG, 0, tsFileManager, result[0], result[1], new AtomicInteger(0)) + "0", + COMPACTION_TEST_SG, + 0, + tsFileManager, + result[0], + result[1], + new AtomicInteger(0), + 0) .call(); validateSeqFiles(); diff --git a/server/src/test/java/org/apache/iotdb/db/engine/compaction/cross/RewriteCrossSpaceCompactionTest.java b/server/src/test/java/org/apache/iotdb/db/engine/compaction/cross/RewriteCrossSpaceCompactionTest.java index 040134b4d4..9aa3d9b76f 100644 --- a/server/src/test/java/org/apache/iotdb/db/engine/compaction/cross/RewriteCrossSpaceCompactionTest.java +++ b/server/src/test/java/org/apache/iotdb/db/engine/compaction/cross/RewriteCrossSpaceCompactionTest.java @@ -227,7 +227,8 @@ public class RewriteCrossSpaceCompactionTest extends AbstractCompactionTest { tsFileManager, seqResources, unseqResources, - new AtomicInteger(0)); + new AtomicInteger(0), + 0); rewriteCrossSpaceCompactionTask.call(); for (TsFileResource resource : seqResources) { @@ -464,7 +465,8 @@ public class RewriteCrossSpaceCompactionTest extends AbstractCompactionTest { tsFileManager, seqResources, unseqResources, - new AtomicInteger(0)); + new AtomicInteger(0), + 0); rewriteCrossSpaceCompactionTask.call(); for (TsFileResource resource : seqResources) { @@ -611,7 +613,8 @@ public class RewriteCrossSpaceCompactionTest extends AbstractCompactionTest { vsgp.getTsFileResourceManager(), seqResources, unseqResources, - new AtomicInteger(0)); + new AtomicInteger(0), + 0); rewriteCrossSpaceCompactionTask.setSourceFilesToCompactionCandidate(); rewriteCrossSpaceCompactionTask.checkValidAndSetMerging(); // delete data in source file during compaction @@ -731,7 +734,8 @@ public class RewriteCrossSpaceCompactionTest extends AbstractCompactionTest { vsgp.getTsFileResourceManager(), seqResources, unseqResources, - new AtomicInteger(0)); + new AtomicInteger(0), + 0); rewriteCrossSpaceCompactionTask.setSourceFilesToCompactionCandidate(); rewriteCrossSpaceCompactionTask.checkValidAndSetMerging(); // delete data in source file during compaction diff --git a/server/src/test/java/org/apache/iotdb/db/engine/compaction/task/FakedCrossSpaceCompactionTaskFactory.java b/server/src/test/java/org/apache/iotdb/db/engine/compaction/task/FakedCrossSpaceCompactionTaskFactory.java deleted file mode 100644 index 1fb4b2e3fc..0000000000 --- a/server/src/test/java/org/apache/iotdb/db/engine/compaction/task/FakedCrossSpaceCompactionTaskFactory.java +++ /dev/null @@ -1,47 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.iotdb.db.engine.compaction.task; - -import org.apache.iotdb.db.conf.IoTDBDescriptor; -import org.apache.iotdb.db.engine.storagegroup.TsFileManager; -import org.apache.iotdb.db.engine.storagegroup.TsFileResource; - -import java.util.List; - -public class FakedCrossSpaceCompactionTaskFactory { - public AbstractCompactionTask createTask( - String logicalStorageGroupName, - String virtualStorageGroupName, - long timePartitionId, - TsFileManager tsFileManager, - List<TsFileResource> selectedSeqTsFileResourceList, - List<TsFileResource> selectedUnSeqTsFileResourceList) { - return IoTDBDescriptor.getInstance() - .getConfig() - .getCrossCompactionStrategy() - .getCompactionTask( - logicalStorageGroupName, - virtualStorageGroupName, - timePartitionId, - tsFileManager, - selectedSeqTsFileResourceList, - selectedUnSeqTsFileResourceList); - } -}
