This is an automated email from the ASF dual-hosted git repository. qiaojialin pushed a commit to branch clear_merge_code in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 20ce1dcb4860839a3509bde11e8e0467e2612301 Author: qiaojialin <[email protected]> AuthorDate: Tue Jun 1 20:21:59 2021 +0800 clean merge code --- .../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 10 +- .../org/apache/iotdb/db/conf/IoTDBDescriptor.java | 4 +- ...PoolManager.java => CompactionTaskManager.java} | 12 +- .../db/engine/compaction/TsFileManagement.java | 109 +++--------- .../level/LevelCompactionTsFileManagement.java | 191 +++++++++------------ .../iotdb/db/engine/merge/manage/MergeManager.java | 2 +- .../db/engine/merge/manage/MergeResource.java | 48 ++++-- .../iotdb/db/engine/merge/recover/LogAnalyzer.java | 7 +- .../engine/merge/selector/IMergeFileSelector.java | 4 + .../iotdb/db/engine/merge/task/MergeTask.java | 183 ++++++++++---------- .../db/engine/merge/task/RecoverMergeTask.java | 1 + .../engine/storagegroup/StorageGroupProcessor.java | 52 +++--- .../java/org/apache/iotdb/db/service/IoTDB.java | 4 +- .../apache/iotdb/db/engine/merge/MergeLogTest.java | 3 +- .../iotdb/db/engine/merge/MergeManagerTest.java | 11 +- .../iotdb/db/engine/merge/MergeOverLapTest.java | 3 +- .../iotdb/db/engine/merge/MergePerfTest.java | 4 +- .../iotdb/db/engine/merge/MergeTaskTest.java | 33 ++-- .../storagegroup/StorageGroupProcessorTest.java | 2 +- .../iotdb/db/integration/IoTDBRestartIT.java | 4 +- .../apache/iotdb/db/utils/EnvironmentUtils.java | 4 +- 21 files changed, 302 insertions(+), 389 deletions(-) diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java index 6f9498b..61b8997 100644 --- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java +++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java @@ -519,7 +519,7 @@ public class IoTDBConfig { * despite how much they are overflowed). This may increase merge overhead depending on how much * the SeqFiles are overflowed. */ - private boolean forceFullMerge = true; + private boolean fullMerge = true; /** The limit of compaction merge can reach per second */ private int mergeWriteThroughputMbPerSec = 8; @@ -1404,12 +1404,12 @@ public class IoTDBConfig { this.enablePartialInsert = enablePartialInsert; } - public boolean isForceFullMerge() { - return forceFullMerge; + public boolean isFullMerge() { + return fullMerge; } - void setForceFullMerge(boolean forceFullMerge) { - this.forceFullMerge = forceFullMerge; + void setFullMerge(boolean fullMerge) { + this.fullMerge = fullMerge; } public int getCompactionThreadNum() { diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java index 645a534..6077a3d 100644 --- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java +++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java @@ -479,10 +479,10 @@ public class IoTDBDescriptor { Long.parseLong( properties.getProperty( "merge_interval_sec", Long.toString(conf.getMergeIntervalSec())))); - conf.setForceFullMerge( + conf.setFullMerge( Boolean.parseBoolean( properties.getProperty( - "force_full_merge", Boolean.toString(conf.isForceFullMerge())))); + "force_full_merge", Boolean.toString(conf.isFullMerge())))); conf.setCompactionThreadNum( Integer.parseInt( properties.getProperty( diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionMergeTaskPoolManager.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionTaskManager.java similarity index 93% rename from server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionMergeTaskPoolManager.java rename to server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionTaskManager.java index cdf6f4a..53dc9cf 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionMergeTaskPoolManager.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionTaskManager.java @@ -46,17 +46,17 @@ import java.util.concurrent.TimeUnit; import static org.apache.iotdb.db.engine.compaction.utils.CompactionLogger.COMPACTION_LOG_NAME; -/** CompactionMergeTaskPoolManager provides a ThreadPool to queue and run all compaction tasks. */ -public class CompactionMergeTaskPoolManager implements IService { +/** CompactionTaskManager provides a ThreadPool to queue and run all compaction tasks. */ +public class CompactionTaskManager implements IService { private static final Logger logger = - LoggerFactory.getLogger(CompactionMergeTaskPoolManager.class); - private static final CompactionMergeTaskPoolManager INSTANCE = - new CompactionMergeTaskPoolManager(); + LoggerFactory.getLogger(CompactionTaskManager.class); + private static final CompactionTaskManager INSTANCE = + new CompactionTaskManager(); private ExecutorService pool; private Map<String, Set<Future<Void>>> storageGroupTasks = new ConcurrentHashMap<>(); - public static CompactionMergeTaskPoolManager getInstance() { + public static CompactionTaskManager getInstance() { return INSTANCE; } diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/TsFileManagement.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/TsFileManagement.java index aea6e8b..439c50d 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/TsFileManagement.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/TsFileManagement.java @@ -22,7 +22,6 @@ package org.apache.iotdb.db.engine.compaction; import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.engine.cache.ChunkCache; import org.apache.iotdb.db.engine.cache.TimeSeriesMetadataCache; -import org.apache.iotdb.db.engine.merge.manage.MergeManager; import org.apache.iotdb.db.engine.merge.manage.MergeResource; import org.apache.iotdb.db.engine.merge.selector.IMergeFileSelector; import org.apache.iotdb.db.engine.merge.selector.MaxFileMergeFileSelector; @@ -33,7 +32,6 @@ import org.apache.iotdb.db.engine.modification.Modification; import org.apache.iotdb.db.engine.modification.ModificationFile; import org.apache.iotdb.db.engine.storagegroup.StorageGroupProcessor.CloseCompactionMergeCallBack; import org.apache.iotdb.db.engine.storagegroup.TsFileResource; -import org.apache.iotdb.db.exception.MergeException; import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer; import org.slf4j.Logger; @@ -62,20 +60,16 @@ public abstract class TsFileManagement { /** Serialize queries, delete resource files, compaction cleanup files */ private final ReadWriteLock compactionMergeLock = new ReentrantReadWriteLock(); - public volatile boolean isUnseqMerging = false; - public volatile boolean isSeqMerging = false; /** * This is the modification file of the result of the current merge. Because the merged file may * be invisible at this moment, without this, deletion/update during merge could be lost. */ public ModificationFile mergingModification; - private long mergeStartTime; - /** whether execute merge chunk in this task */ protected boolean isMergeExecutedInCurrentTask = false; - protected boolean isForceFullMerge = IoTDBDescriptor.getInstance().getConfig().isForceFullMerge(); + protected boolean isForceFullMerge = IoTDBDescriptor.getInstance().getConfig().isFullMerge(); private final int maxOpenFileNumInEachUnseqCompaction = IoTDBDescriptor.getInstance().getConfig().getMaxOpenFileNumInEachUnseqCompaction(); @@ -84,7 +78,7 @@ public abstract class TsFileManagement { this.storageGroupDir = storageGroupDir; } - public void setForceFullMerge(boolean forceFullMerge) { + public void setFullMerge(boolean forceFullMerge) { isForceFullMerge = forceFullMerge; } @@ -176,11 +170,11 @@ public abstract class TsFileManagement { } } - public class CompactionRecoverTask implements Callable<Void> { + public class LevelCompactionRecoverTask implements Callable<Void> { private CloseCompactionMergeCallBack closeCompactionMergeCallBack; - public CompactionRecoverTask(CloseCompactionMergeCallBack closeCompactionMergeCallBack) { + public LevelCompactionRecoverTask(CloseCompactionMergeCallBack closeCompactionMergeCallBack) { this.closeCompactionMergeCallBack = closeCompactionMergeCallBack; } @@ -194,107 +188,63 @@ public abstract class TsFileManagement { } } - public synchronized void merge( + protected void doUnseqMerge( boolean fullMerge, List<TsFileResource> seqMergeList, - List<TsFileResource> unSeqMergeList, - long dataTTL) { - if (isUnseqMerging) { - if (logger.isInfoEnabled()) { - logger.info( - "{} Last merge is ongoing, currently consumed time: {}ms", - storageGroupName, - (System.currentTimeMillis() - mergeStartTime)); - } - return; - } - // wait until seq merge has finished - while (isSeqMerging) { - try { - wait(200); - } catch (InterruptedException e) { - logger.error("{} [Compaction] shutdown", storageGroupName, e); - Thread.currentThread().interrupt(); - return; - } - } - isUnseqMerging = true; - - if (seqMergeList.isEmpty()) { - logger.info("{} no seq files to be merged", storageGroupName); - isUnseqMerging = false; - return; - } + List<TsFileResource> unSeqMergeList) { - if (unSeqMergeList.isEmpty()) { - logger.info("{} no unseq files to be merged", storageGroupName); - isUnseqMerging = false; + if (seqMergeList.isEmpty() || unSeqMergeList.isEmpty()) { + logger.info("{} no files to be merged, seqFiles={}, unseqFiles={}", + storageGroupName, seqMergeList.size(), unSeqMergeList.size()); return; } + // the number of unseq files in one merge should not exceed maxOpenFileNumInEachUnseqCompaction if (unSeqMergeList.size() > maxOpenFileNumInEachUnseqCompaction) { - logger.info( - "{} too much unseq files to be merged, reduce it to {}", - storageGroupName, - maxOpenFileNumInEachUnseqCompaction); unSeqMergeList = unSeqMergeList.subList(0, maxOpenFileNumInEachUnseqCompaction); } - long budget = IoTDBDescriptor.getInstance().getConfig().getMergeMemoryBudget(); - long timeLowerBound = System.currentTimeMillis() - dataTTL; + long timeLowerBound = System.currentTimeMillis() - IoTDBDescriptor.getInstance().getConfig().getDefaultTTL(); MergeResource mergeResource = new MergeResource(seqMergeList, unSeqMergeList, timeLowerBound); - IMergeFileSelector fileSelector = getMergeFileSelector(budget, mergeResource); + IMergeFileSelector fileSelector = getMergeFileSelector(mergeResource); try { List[] mergeFiles = fileSelector.select(); if (mergeFiles.length == 0) { logger.info( - "{} cannot select merge candidates under the budget {}", storageGroupName, budget); - isUnseqMerging = false; + "{} cannot select merge candidates under the budget {}", + storageGroupName, IoTDBDescriptor.getInstance().getConfig().getMergeMemoryBudget()); return; } - // avoid pending tasks holds the metadata and streams - mergeResource.clear(); - String taskName = storageGroupName + "-" + System.currentTimeMillis(); - // do not cache metadata until true candidates are chosen, or too much metadata will be - // cached during selection - mergeResource.setCacheDeviceMeta(true); - - for (TsFileResource tsFileResource : mergeResource.getSeqFiles()) { - tsFileResource.setMerging(true); - } - for (TsFileResource tsFileResource : mergeResource.getUnseqFiles()) { - tsFileResource.setMerging(true); - } - mergeStartTime = System.currentTimeMillis(); + mergeResource.startMerging(); + MergeTask mergeTask = new MergeTask( mergeResource, storageGroupDir, this::mergeEndAction, - taskName, fullMerge, fileSelector.getConcurrentMergeNum(), storageGroupName); + mergingModification = new ModificationFile(storageGroupDir + File.separator + MERGING_MODIFICATION_FILE_NAME); - MergeManager.getINSTANCE().submitMainTask(mergeTask); - if (logger.isInfoEnabled()) { - logger.info( - "{} submits a merge task {}, merging {} seqFiles, {} unseqFiles", - storageGroupName, - taskName, - mergeFiles[0].size(), - mergeFiles[1].size()); - } - } catch (MergeException | IOException e) { + logger.info( + "{} start merge {} seqFiles, {} unseqFiles", storageGroupName, + mergeFiles[0].size(), + mergeFiles[1].size()); + + mergeTask.doMerge(); + + } catch (Exception e) { logger.error("{} cannot select file for merge", storageGroupName, e); } } - private IMergeFileSelector getMergeFileSelector(long budget, MergeResource resource) { + private IMergeFileSelector getMergeFileSelector(MergeResource resource) { + long budget = IoTDBDescriptor.getInstance().getConfig().getMergeMemoryBudget(); MergeFileStrategy strategy = IoTDBDescriptor.getInstance().getConfig().getMergeFileStrategy(); switch (strategy) { case MAX_FILE_NUM: @@ -402,14 +352,12 @@ public abstract class TsFileManagement { if (Thread.currentThread().isInterrupted() || unseqFiles.isEmpty()) { // merge task abort, or merge runtime exception arose, just end this merge - isUnseqMerging = false; logger.info("{} a merge task abnormally ends", storageGroupName); return; } removeUnseqFiles(unseqFiles); - for (int i = 0; i < seqFiles.size(); i++) { - TsFileResource seqFile = seqFiles.get(i); + for (TsFileResource seqFile : seqFiles) { // get both seqFile lock and merge lock doubleWriteLock(seqFile); @@ -430,7 +378,6 @@ public abstract class TsFileManagement { try { removeMergingModification(); - isUnseqMerging = false; Files.delete(mergeLog.toPath()); } catch (IOException e) { logger.error( diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/level/LevelCompactionTsFileManagement.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/level/LevelCompactionTsFileManagement.java index 98f1c2e..0b6730d 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/level/LevelCompactionTsFileManagement.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/level/LevelCompactionTsFileManagement.java @@ -64,7 +64,7 @@ public class LevelCompactionTsFileManagement extends TsFileManagement { private static final Logger logger = LoggerFactory.getLogger(LevelCompactionTsFileManagement.class); - private final int seqLevelNum = + private final int totalSeqLevelNum = Math.max(IoTDBDescriptor.getInstance().getConfig().getSeqLevelNum(), 1); private final int seqFileNumInEachLevel = Math.max(IoTDBDescriptor.getInstance().getConfig().getSeqFileNumInEachLevel(), 1); @@ -267,7 +267,7 @@ public class LevelCompactionTsFileManagement extends TsFileManagement { long timePartitionId = tsFileResource.getTimePartition(); int level = getMergeLevel(tsFileResource.getTsFile()); if (sequence) { - if (level <= seqLevelNum - 1) { + if (level <= totalSeqLevelNum - 1) { // current file has normal level sequenceTsFileResources .computeIfAbsent(timePartitionId, this::newSequenceTsFileResources) @@ -277,7 +277,7 @@ public class LevelCompactionTsFileManagement extends TsFileManagement { // current file has too high level sequenceTsFileResources .computeIfAbsent(timePartitionId, this::newSequenceTsFileResources) - .get(seqLevelNum - 1) + .get(totalSeqLevelNum - 1) .add(tsFileResource); } } else { @@ -397,7 +397,7 @@ public class LevelCompactionTsFileManagement extends TsFileManagement { if (sequence) { for (List<SortedSet<TsFileResource>> partitionSequenceTsFileResource : sequenceTsFileResources.values()) { - for (int i = seqLevelNum - 1; i >= 0; i--) { + for (int i = totalSeqLevelNum - 1; i >= 0; i--) { result += partitionSequenceTsFileResource.get(i).size(); } } @@ -564,7 +564,7 @@ public class LevelCompactionTsFileManagement extends TsFileManagement { forkTsFileList( forkedSequenceTsFileResources, sequenceTsFileResources.computeIfAbsent(timePartition, this::newSequenceTsFileResources), - seqLevelNum); + totalSeqLevelNum); // we have to copy all unseq file forkTsFileList( forkedUnSequenceTsFileResources, @@ -594,47 +594,28 @@ public class LevelCompactionTsFileManagement extends TsFileManagement { @Override protected void merge(long timePartition) { - isMergeExecutedInCurrentTask = - merge( - forkedSequenceTsFileResources, true, timePartition, seqLevelNum, seqFileNumInEachLevel); - if (enableUnseqCompaction - && unseqLevelNum <= 1 - && forkedUnSequenceTsFileResources.get(0).size() > 0) { - isMergeExecutedInCurrentTask = true; - merge( - isForceFullMerge, - getTsFileListByTimePartition(true, timePartition), - forkedUnSequenceTsFileResources.get(0), - Long.MAX_VALUE); - } else { - isMergeExecutedInCurrentTask = - merge( - forkedUnSequenceTsFileResources, - false, - timePartition, - unseqLevelNum, - unseqFileNumInEachLevel); + // do unseq compaction if has an unseq file in max unseq level + if (enableUnseqCompaction && forkedUnSequenceTsFileResources.get(unseqLevelNum - 1).size() > 0) { + doUnseqMerge(isForceFullMerge, getTsFileListByTimePartition(true, timePartition), + forkedUnSequenceTsFileResources.get(unseqLevelNum - 1)); } + + doLevelCompaction(forkedSequenceTsFileResources, true, timePartition, totalSeqLevelNum, + seqFileNumInEachLevel); + + doLevelCompaction(forkedUnSequenceTsFileResources, false, timePartition, unseqLevelNum, + unseqFileNumInEachLevel); + } - @SuppressWarnings("squid:S3776") - private boolean merge( - List<List<TsFileResource>> mergeResources, + @SuppressWarnings("squid:S3776") //MERGE TODO: move to a LevelCompactionExecutor + private boolean doLevelCompaction( + List<List<TsFileResource>> mergeResources, // each level is a List<TsFileResource> boolean sequence, long timePartition, int currMaxLevel, int currMaxFileNumInEachLevel) { // wait until unseq merge has finished - while (isUnseqMerging) { - try { - Thread.sleep(200); - } catch (InterruptedException e) { - logger.error("{} [Compaction] shutdown", storageGroupName, e); - Thread.currentThread().interrupt(); - return false; - } - } - isSeqMerging = true; long startTimeMillis = System.currentTimeMillis(); // whether execute merge chunk in the loop below boolean isMergeExecutedInCurrentTask = false; @@ -646,83 +627,72 @@ public class LevelCompactionTsFileManagement extends TsFileManagement { if (currMaxFileNumInEachLevel <= currLevelTsFileResource.size()) { // just merge part of the file isMergeExecutedInCurrentTask = true; - // level is numbered from 0 - if (enableUnseqCompaction && !sequence && i == currMaxLevel - 2) { - // do not merge current unseq file level to upper level and just merge all of them to - // seq file - isSeqMerging = false; - merge( - isForceFullMerge, - getTsFileListByTimePartition(true, timePartition), - mergeResources.get(i), - Long.MAX_VALUE); - } else { - compactionLogger = new CompactionLogger(storageGroupDir, storageGroupName); - // log source file list and target file for recover - for (TsFileResource mergeResource : mergeResources.get(i)) { - compactionLogger.logFile(SOURCE_NAME, mergeResource.getTsFile()); - } - File newLevelFile = - TsFileResource.modifyTsFileNameMergeCnt(mergeResources.get(i).get(0).getTsFile()); - compactionLogger.logSequence(sequence); - compactionLogger.logFile(TARGET_NAME, newLevelFile); - List<TsFileResource> toMergeTsFiles = - mergeResources.get(i).subList(0, currMaxFileNumInEachLevel); + compactionLogger = new CompactionLogger(storageGroupDir, storageGroupName); + // log source file list and target file for recover + for (TsFileResource mergeResource : mergeResources.get(i)) { + compactionLogger.logFile(SOURCE_NAME, mergeResource.getTsFile()); + } + File newLevelFile = + TsFileResource.modifyTsFileNameMergeCnt(mergeResources.get(i).get(0).getTsFile()); + compactionLogger.logSequence(sequence); + compactionLogger.logFile(TARGET_NAME, newLevelFile); + List<TsFileResource> toMergeTsFiles = + mergeResources.get(i).subList(0, currMaxFileNumInEachLevel); + logger.info( + "{} [Compaction] merge level-{}'s {} TsFiles to next level", + storageGroupName, + i, + toMergeTsFiles.size()); + for (TsFileResource toMergeTsFile : toMergeTsFiles) { logger.info( - "{} [Compaction] merge level-{}'s {} TsFiles to next level", - storageGroupName, - i, - toMergeTsFiles.size()); - for (TsFileResource toMergeTsFile : toMergeTsFiles) { - logger.info( - "{} [Compaction] start to merge TsFile {}", storageGroupName, toMergeTsFile); - } + "{} [Compaction] start to merge TsFile {}", storageGroupName, toMergeTsFile); + } - TsFileResource newResource = new TsFileResource(newLevelFile); - List<Modification> modifications = new ArrayList<>(); - // merge, read from source files and write to target file - CompactionUtils.merge( - newResource, - toMergeTsFiles, - storageGroupName, - compactionLogger, - new HashSet<>(), - sequence, - modifications); - logger.info( - "{} [Compaction] merged level-{}'s {} TsFiles to next level, and start to delete old files", - storageGroupName, - i, - toMergeTsFiles.size()); - writeLock(); - try { - if (Thread.currentThread().isInterrupted()) { - throw new InterruptedException( - String.format("%s [Compaction] abort", storageGroupName)); - } + TsFileResource newResource = new TsFileResource(newLevelFile); + List<Modification> modifications = new ArrayList<>(); + // merge, read from source files and write to target file + CompactionUtils.merge( + newResource, + toMergeTsFiles, + storageGroupName, + compactionLogger, + new HashSet<>(), + sequence, + modifications); + logger.info( + "{} [Compaction] merged level-{}'s {} TsFiles to next level, and start to delete old files", + storageGroupName, + i, + toMergeTsFiles.size()); + writeLock(); + try { + if (Thread.currentThread().isInterrupted()) { + throw new InterruptedException( + String.format("%s [Compaction] abort", storageGroupName)); + } - if (sequence) { - sequenceTsFileResources.get(timePartition).get(i + 1).add(newResource); - } else { - unSequenceTsFileResources.get(timePartition).get(i + 1).add(newResource); - } - deleteLevelFilesInList(timePartition, toMergeTsFiles, i, sequence); - if (mergeResources.size() > i + 1) { - mergeResources.get(i + 1).add(newResource); - } - } finally { - writeUnlock(); + if (sequence) { + sequenceTsFileResources.get(timePartition).get(i + 1).add(newResource); + } else { + unSequenceTsFileResources.get(timePartition).get(i + 1).add(newResource); } - deleteLevelFilesInDisk(toMergeTsFiles); - renameLevelFilesMods(modifications, toMergeTsFiles, newResource); - compactionLogger.close(); - File logFile = - FSFactoryProducer.getFSFactory() - .getFile(storageGroupDir, storageGroupName + COMPACTION_LOG_NAME); - if (logFile.exists()) { - Files.delete(logFile.toPath()); + deleteLevelFilesInList(timePartition, toMergeTsFiles, i, sequence); + if (mergeResources.size() > i + 1) { + mergeResources.get(i + 1).add(newResource); } + } finally { + writeUnlock(); + } + deleteLevelFilesInDisk(toMergeTsFiles); + renameLevelFilesMods(modifications, toMergeTsFiles, newResource); + compactionLogger.close(); + File logFile = + FSFactoryProducer.getFSFactory() + .getFile(storageGroupDir, storageGroupName + COMPACTION_LOG_NAME); + if (logFile.exists()) { + Files.delete(logFile.toPath()); } + break; } } } catch (Exception e) { @@ -736,7 +706,6 @@ public class LevelCompactionTsFileManagement extends TsFileManagement { restoreCompaction(); logger.error("Error occurred in Compaction Merge thread", e); } finally { - isSeqMerging = false; // reset the merge working state to false logger.info( "{} [Compaction] merge end time isSeq = {}, consumption: {} ms", @@ -749,7 +718,7 @@ public class LevelCompactionTsFileManagement extends TsFileManagement { private List<SortedSet<TsFileResource>> newSequenceTsFileResources(Long k) { List<SortedSet<TsFileResource>> newSequenceTsFileResources = new ArrayList<>(); - for (int i = 0; i < seqLevelNum; i++) { + for (int i = 0; i < totalSeqLevelNum; i++) { newSequenceTsFileResources.add( new TreeSet<>( (o1, o2) -> { diff --git a/server/src/main/java/org/apache/iotdb/db/engine/merge/manage/MergeManager.java b/server/src/main/java/org/apache/iotdb/db/engine/merge/manage/MergeManager.java index f9112e1..84c6859 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/merge/manage/MergeManager.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/merge/manage/MergeManager.java @@ -253,7 +253,7 @@ public class MergeManager implements IService, MergeManagerMBean { private void mergeAll() { try { StorageEngine.getInstance() - .mergeAll(IoTDBDescriptor.getInstance().getConfig().isForceFullMerge()); + .mergeAll(IoTDBDescriptor.getInstance().getConfig().isFullMerge()); } catch (StorageEngineException e) { logger.error("Cannot perform a global merge because", e); } diff --git a/server/src/main/java/org/apache/iotdb/db/engine/merge/manage/MergeResource.java b/server/src/main/java/org/apache/iotdb/db/engine/merge/manage/MergeResource.java index d7bc827..050d83e 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/merge/manage/MergeResource.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/merge/manage/MergeResource.java @@ -24,6 +24,7 @@ import org.apache.iotdb.db.engine.storagegroup.TsFileResource; import org.apache.iotdb.db.metadata.PartialPath; import org.apache.iotdb.db.query.reader.resource.CachedUnseqResourceMergeReader; import org.apache.iotdb.db.utils.MergeUtils; +import org.apache.iotdb.db.utils.TestOnly; import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata; import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer; @@ -91,26 +92,27 @@ public class MergeResource { this.unseqFiles = unseqFiles.stream().filter(this::filterResource).collect(Collectors.toList()); } - public void clear() throws IOException { - for (TsFileSequenceReader sequenceReader : fileReaderCache.values()) { - sequenceReader.close(); - } - for (RestorableTsFileIOWriter writer : fileWriterCache.values()) { - writer.close(); - } - - fileReaderCache.clear(); - fileWriterCache.clear(); - modificationCache.clear(); - measurementSchemaMap.clear(); - chunkWriterCache.clear(); - } - public IMeasurementSchema getSchema(PartialPath path) { return measurementSchemaMap.get(path); } /** + * startMerging() is called after selecting files + * + * do not cache metadata until true candidates are chosen, or too much metadata will be + * cached during selection + */ + public void startMerging() { + cacheDeviceMeta = true; + for (TsFileResource tsFileResource : seqFiles) { + tsFileResource.setMerging(true); + } + for (TsFileResource tsFileResource : unseqFiles) { + tsFileResource.setMerging(true); + } + } + + /** * Construct a new or get an existing RestorableTsFileIOWriter of a merge temp file for a SeqFile. * The path of the merge temp file will be the seqFile's + ".merge". * @@ -290,4 +292,20 @@ public class MergeResource { public Map<String, Pair<Long, Long>> getStartEndTime(TsFileResource tsFileResource) { return startEndTimeCache.getOrDefault(tsFileResource, new HashMap<>()); } + + @TestOnly + public void clear() throws IOException { + for (TsFileSequenceReader sequenceReader : fileReaderCache.values()) { + sequenceReader.close(); + } + for (RestorableTsFileIOWriter writer : fileWriterCache.values()) { + writer.close(); + } + + fileReaderCache.clear(); + fileWriterCache.clear(); + modificationCache.clear(); + measurementSchemaMap.clear(); + chunkWriterCache.clear(); + } } diff --git a/server/src/main/java/org/apache/iotdb/db/engine/merge/recover/LogAnalyzer.java b/server/src/main/java/org/apache/iotdb/db/engine/merge/recover/LogAnalyzer.java index 9693d3c..e6f84fc 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/merge/recover/LogAnalyzer.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/merge/recover/LogAnalyzer.java @@ -78,6 +78,7 @@ public class LogAnalyzer { private Status status; + // MERGE TODO: add two methods: List<String> getSeqTsFiles() List<String> getUnseqTsFiles() public LogAnalyzer( MergeResource resource, String taskName, File logFile, String storageGroupName) { this.resource = resource; @@ -102,10 +103,11 @@ public class LogAnalyzer { analyzeUnseqFiles(bufferedReader); - List<PartialPath> storageGroupPaths = + // get all timeseries in this storage group + List<PartialPath> timeseriesPaths = IoTDB.metaManager.getAllTimeseriesPath(new PartialPath(storageGroupName + ".*")); unmergedPaths = new ArrayList<>(); - unmergedPaths.addAll(storageGroupPaths); + unmergedPaths.addAll(timeseriesPaths); analyzeMergedSeries(bufferedReader, unmergedPaths); @@ -120,6 +122,7 @@ public class LogAnalyzer { return; } long startTime = System.currentTimeMillis(); + // List<TsFileResource> mergeSeqFiles = new ArrayList<>(); while ((currLine = bufferedReader.readLine()) != null) { if (STR_UNSEQ_FILES.equals(currLine)) { diff --git a/server/src/main/java/org/apache/iotdb/db/engine/merge/selector/IMergeFileSelector.java b/server/src/main/java/org/apache/iotdb/db/engine/merge/selector/IMergeFileSelector.java index 077a58b..63fc04c 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/merge/selector/IMergeFileSelector.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/merge/selector/IMergeFileSelector.java @@ -29,6 +29,10 @@ import java.util.List; */ public interface IMergeFileSelector { + /** + * @return seqFileList, unseqFileList + * @throws MergeException + */ List[] select() throws MergeException; int getConcurrentMergeNum(); diff --git a/server/src/main/java/org/apache/iotdb/db/engine/merge/task/MergeTask.java b/server/src/main/java/org/apache/iotdb/db/engine/merge/task/MergeTask.java index 381651c..89f4574 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/merge/task/MergeTask.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/merge/task/MergeTask.java @@ -51,7 +51,7 @@ import java.util.concurrent.Callable; * merged chunks in the temp files back to the seqFiles or move the unmerged chunks in the seqFiles * into temp files and replace the seqFiles with the temp files. 3. remove unseqFiles */ -public class MergeTask implements Callable<Void> { +public class MergeTask { public static final String MERGE_SUFFIX = ".merge"; private static final Logger logger = LoggerFactory.getLogger(MergeTask.class); @@ -67,7 +67,7 @@ public class MergeTask implements Callable<Void> { States states = States.START; MergeMultiChunkTask chunkTask; MergeFileTask fileTask; - private MergeCallback callback; + protected MergeCallback callback; MergeTask( List<TsFileResource> seqFiles, @@ -90,29 +90,18 @@ public class MergeTask implements Callable<Void> { MergeResource mergeResource, String storageGroupSysDir, MergeCallback callback, - String taskName, boolean fullMerge, int concurrentMergeSeriesNum, String storageGroupName) { this.resource = mergeResource; this.storageGroupSysDir = storageGroupSysDir; this.callback = callback; - this.taskName = taskName; + this.taskName = storageGroupName; this.fullMerge = fullMerge; this.concurrentMergeSeriesNum = concurrentMergeSeriesNum; this.storageGroupName = storageGroupName; } - @Override - public Void call() throws Exception { - try { - doMerge(); - } catch (Exception e) { - logger.error("Runtime exception in merge {}", taskName, e); - abort(); - } - return null; - } private void abort() throws IOException { states = States.ABORTED; @@ -125,101 +114,103 @@ public class MergeTask implements Callable<Void> { new File(storageGroupSysDir, MergeLogger.MERGE_LOG_NAME)); } - private void doMerge() throws IOException, MetadataException { - if (resource.getSeqFiles().isEmpty()) { - logger.info("{} no sequence file to merge into, so will abort task.", taskName); - abort(); - return; - } - if (logger.isInfoEnabled()) { - logger.info( - "{} starts to merge {} seqFiles, {} unseqFiles", - taskName, - resource.getSeqFiles().size(), - resource.getUnseqFiles().size()); - } - long startTime = System.currentTimeMillis(); - long totalFileSize = - MergeUtils.collectFileSizes(resource.getSeqFiles(), resource.getUnseqFiles()); - mergeLogger = new MergeLogger(storageGroupSysDir); + public void doMerge() throws IOException { + try { + if (resource.getSeqFiles().isEmpty()) { + logger.info("{} no sequence file to merge into, so will abort task.", taskName); + abort(); + return; + } + if (logger.isInfoEnabled()) { + logger.info( + "{} starts to merge {} seqFiles, {} unseqFiles", + taskName, + resource.getSeqFiles().size(), + resource.getUnseqFiles().size()); + } + long startTime = System.currentTimeMillis(); + long totalFileSize = + MergeUtils.collectFileSizes(resource.getSeqFiles(), resource.getUnseqFiles()); + mergeLogger = new MergeLogger(storageGroupSysDir); - mergeLogger.logFiles(resource); + mergeLogger.logFiles(resource); - Set<PartialPath> devices = IoTDB.metaManager.getDevices(new PartialPath(storageGroupName)); - Map<PartialPath, IMeasurementSchema> measurementSchemaMap = new HashMap<>(); - List<PartialPath> unmergedSeries = new ArrayList<>(); - for (PartialPath device : devices) { - MNode deviceNode = IoTDB.metaManager.getNodeByPath(device); - // todo add template merge logic - for (Entry<String, MNode> entry : deviceNode.getChildren().entrySet()) { - PartialPath path = device.concatNode(entry.getKey()); - measurementSchemaMap.put(path, ((MeasurementMNode) entry.getValue()).getSchema()); - unmergedSeries.add(path); + Set<PartialPath> devices = IoTDB.metaManager.getDevices(new PartialPath(storageGroupName)); + Map<PartialPath, IMeasurementSchema> measurementSchemaMap = new HashMap<>(); + List<PartialPath> unmergedSeries = new ArrayList<>(); + for (PartialPath device : devices) { + MNode deviceNode = IoTDB.metaManager.getNodeByPath(device); + // todo add template merge logic + for (Entry<String, MNode> entry : deviceNode.getChildren().entrySet()) { + PartialPath path = device.concatNode(entry.getKey()); + measurementSchemaMap.put(path, ((MeasurementMNode) entry.getValue()).getSchema()); + unmergedSeries.add(path); + } } - } - resource.setMeasurementSchemaMap(measurementSchemaMap); + resource.setMeasurementSchemaMap(measurementSchemaMap); - mergeLogger.logMergeStart(); + mergeLogger.logMergeStart(); - chunkTask = - new MergeMultiChunkTask( - mergeContext, - taskName, - mergeLogger, - resource, - fullMerge, - unmergedSeries, - concurrentMergeSeriesNum, - storageGroupName); - states = States.MERGE_CHUNKS; - chunkTask.mergeSeries(); - if (Thread.interrupted()) { - logger.info("Merge task {} aborted", taskName); - abort(); - return; - } + chunkTask = + new MergeMultiChunkTask( + mergeContext, + taskName, + mergeLogger, + resource, + fullMerge, + unmergedSeries, + concurrentMergeSeriesNum, + storageGroupName); + states = States.MERGE_CHUNKS; + chunkTask.mergeSeries(); + if (Thread.interrupted()) { + logger.info("Merge task {} aborted", taskName); + abort(); + return; + } - fileTask = - new MergeFileTask(taskName, mergeContext, mergeLogger, resource, resource.getSeqFiles()); - states = States.MERGE_FILES; - chunkTask = null; - fileTask.mergeFiles(); - if (Thread.interrupted()) { - logger.info("Merge task {} aborted", taskName); - abort(); - return; - } + fileTask = + new MergeFileTask(taskName, mergeContext, mergeLogger, resource, resource.getSeqFiles()); + states = States.MERGE_FILES; + chunkTask = null; + fileTask.mergeFiles(); + if (Thread.interrupted()) { + logger.info("Merge task {} aborted", taskName); + abort(); + return; + } - states = States.CLEAN_UP; - fileTask = null; - cleanUp(true); - if (logger.isInfoEnabled()) { - double elapsedTime = (double) (System.currentTimeMillis() - startTime) / 1000.0; - double byteRate = totalFileSize / elapsedTime / 1024 / 1024; - double seriesRate = unmergedSeries.size() / elapsedTime; - double chunkRate = mergeContext.getTotalChunkWritten() / elapsedTime; - double fileRate = - (resource.getSeqFiles().size() + resource.getUnseqFiles().size()) / elapsedTime; - double ptRate = mergeContext.getTotalPointWritten() / elapsedTime; - logger.info( - "{} ends after {}s, byteRate: {}MB/s, seriesRate {}/s, chunkRate: {}/s, " - + "fileRate: {}/s, ptRate: {}/s", - taskName, - elapsedTime, - byteRate, - seriesRate, - chunkRate, - fileRate, - ptRate); + states = States.CLEAN_UP; + fileTask = null; + cleanUp(true); + if (logger.isInfoEnabled()) { + double elapsedTime = (double) (System.currentTimeMillis() - startTime) / 1000.0; + double byteRate = totalFileSize / elapsedTime / 1024 / 1024; + double seriesRate = unmergedSeries.size() / elapsedTime; + double chunkRate = mergeContext.getTotalChunkWritten() / elapsedTime; + double fileRate = + (resource.getSeqFiles().size() + resource.getUnseqFiles().size()) / elapsedTime; + double ptRate = mergeContext.getTotalPointWritten() / elapsedTime; + logger.info( + "{} ends after {}s, byteRate: {}MB/s, seriesRate {}/s, chunkRate: {}/s, " + + "fileRate: {}/s, ptRate: {}/s", + taskName, + elapsedTime, + byteRate, + seriesRate, + chunkRate, + fileRate, + ptRate); + } + } catch (Exception e) { + logger.error("Runtime exception in merge {}", taskName, e); + abort(); } } void cleanUp(boolean executeCallback) throws IOException { logger.info("{} is cleaning up", taskName); - resource.clear(); - mergeContext.clear(); - if (mergeLogger != null) { mergeLogger.close(); } diff --git a/server/src/main/java/org/apache/iotdb/db/engine/merge/task/RecoverMergeTask.java b/server/src/main/java/org/apache/iotdb/db/engine/merge/task/RecoverMergeTask.java index 32cd897..cb1e39e 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/merge/task/RecoverMergeTask.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/merge/task/RecoverMergeTask.java @@ -53,6 +53,7 @@ public class RecoverMergeTask extends MergeTask { private LogAnalyzer analyzer; + // MERGE TODO: get seqFiles and unseqFiles to be recovered from merge.log) public RecoverMergeTask( List<TsFileResource> seqFiles, List<TsFileResource> unseqFiles, diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java index 26261df..75b2175 100755 --- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java @@ -23,7 +23,7 @@ import org.apache.iotdb.db.conf.IoTDBConstant; import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.conf.directories.DirectoryManager; import org.apache.iotdb.db.engine.StorageEngine; -import org.apache.iotdb.db.engine.compaction.CompactionMergeTaskPoolManager; +import org.apache.iotdb.db.engine.compaction.CompactionTaskManager; import org.apache.iotdb.db.engine.compaction.TsFileManagement; import org.apache.iotdb.db.engine.compaction.level.LevelCompactionTsFileManagement; import org.apache.iotdb.db.engine.fileSystem.SystemFileFactory; @@ -174,8 +174,8 @@ public class StorageGroupProcessor { private final TreeMap<Long, TsFileProcessor> workSequenceTsFileProcessors = new TreeMap<>(); /** time partition id in the storage group -> tsFileProcessor for this time partition */ private final TreeMap<Long, TsFileProcessor> workUnsequenceTsFileProcessors = new TreeMap<>(); - /** compactionMergeWorking is used to wait for last compaction to be done. */ - private volatile boolean compactionMergeWorking = false; + /** isCompactionWorking is used to wait for last compaction to be done. */ + private volatile boolean isCompactionWorking = false; // upgrading sequence TsFile resource list private List<TsFileResource> upgradeSeqFileList = new LinkedList<>(); @@ -446,33 +446,37 @@ public class StorageGroupProcessor { recoverTsFiles(value, false); } - String taskName = + String unseqMergeTaskName = logicalStorageGroupName + "-" + virtualStorageGroupId + "-" + System.currentTimeMillis(); File mergingMods = SystemFileFactory.INSTANCE.getFile(storageGroupSysDir, MERGING_MODIFICATION_FILE_NAME); if (mergingMods.exists()) { this.tsFileManagement.mergingModification = new ModificationFile(mergingMods.getPath()); } + + // MERGE TODO: only pass the tsfileManagement into RecoverMergeTask RecoverMergeTask recoverMergeTask = new RecoverMergeTask( new ArrayList<>(tsFileManagement.getTsFileList(true)), tsFileManagement.getTsFileList(false), storageGroupSysDir.getPath(), tsFileManagement::mergeEndAction, - taskName, - IoTDBDescriptor.getInstance().getConfig().isForceFullMerge(), + unseqMergeTaskName, + IoTDBDescriptor.getInstance().getConfig().isFullMerge(), logicalStorageGroupName + "-" + virtualStorageGroupId); logger.info( "{} - {} a RecoverMergeTask {} starts...", logicalStorageGroupName, virtualStorageGroupId, - taskName); + unseqMergeTaskName); recoverMergeTask.recoverMerge( IoTDBDescriptor.getInstance().getConfig().isContinueMergeAfterReboot()); if (!IoTDBDescriptor.getInstance().getConfig().isContinueMergeAfterReboot()) { mergingMods.delete(); } - recoverCompaction(); + + // MERGE TODO: move unseqMergeRecover into TsFileManagement.CompactionRecoverTask() + recoverLevelCompaction(); for (TsFileResource resource : tsFileManagement.getTsFileList(true)) { long partitionNum = resource.getTimePartition(); updatePartitionFileVersion(partitionNum, resource.getVersion()); @@ -511,27 +515,25 @@ public class StorageGroupProcessor { globalLatestFlushedTimeForEachDevice.putAll(endTimeMap); } - if (IoTDBDescriptor.getInstance().getConfig().isEnableContinuousCompaction() - && seqTsFileResources.size() > 0) { + if (IoTDBDescriptor.getInstance().getConfig().isEnableContinuousCompaction()) { for (long timePartitionId : timePartitionIdVersionControllerMap.keySet()) { - executeCompaction( - timePartitionId, IoTDBDescriptor.getInstance().getConfig().isForceFullMerge()); + executeCompaction(timePartitionId, IoTDBDescriptor.getInstance().getConfig().isFullMerge()); } } } - private void recoverCompaction() { - if (!CompactionMergeTaskPoolManager.getInstance().isTerminated()) { - compactionMergeWorking = true; + private void recoverLevelCompaction() { + if (!CompactionTaskManager.getInstance().isTerminated()) { + isCompactionWorking = true; logger.info( "{} - {} submit a compaction recover merge task", logicalStorageGroupName, virtualStorageGroupId); try { - CompactionMergeTaskPoolManager.getInstance() + CompactionTaskManager.getInstance() .submitTask( logicalStorageGroupName, - tsFileManagement.new CompactionRecoverTask(this::closeCompactionMergeCallBack)); + tsFileManagement.new LevelCompactionRecoverTask(this::closeCompactionMergeCallBack)); } catch (RejectedExecutionException e) { this.closeCompactionMergeCallBack(false, 0); logger.error( @@ -1971,20 +1973,20 @@ public class StorageGroupProcessor { executeCompaction( tsFileProcessor.getTimeRangeId(), - IoTDBDescriptor.getInstance().getConfig().isForceFullMerge()); + IoTDBDescriptor.getInstance().getConfig().isFullMerge()); } private void executeCompaction(long timePartition, boolean fullMerge) { - if (!compactionMergeWorking && !CompactionMergeTaskPoolManager.getInstance().isTerminated()) { - compactionMergeWorking = true; + if (!isCompactionWorking && !CompactionTaskManager.getInstance().isTerminated()) { + isCompactionWorking = true; logger.info( "{} submit a compaction merge task", logicalStorageGroupName + "-" + virtualStorageGroupId); try { // fork and filter current tsfile, then commit then to compaction merge tsFileManagement.forkCurrentFileList(timePartition); - tsFileManagement.setForceFullMerge(fullMerge); - CompactionMergeTaskPoolManager.getInstance() + tsFileManagement.setFullMerge(fullMerge); + CompactionTaskManager.getInstance() .submitTask( logicalStorageGroupName, tsFileManagement @@ -2007,9 +2009,9 @@ public class StorageGroupProcessor { private void closeCompactionMergeCallBack(boolean isMerge, long timePartitionId) { if (isMerge && IoTDBDescriptor.getInstance().getConfig().isEnableContinuousCompaction()) { executeCompaction( - timePartitionId, IoTDBDescriptor.getInstance().getConfig().isForceFullMerge()); + timePartitionId, IoTDBDescriptor.getInstance().getConfig().isFullMerge()); } else { - this.compactionMergeWorking = false; + this.isCompactionWorking = false; } } @@ -2842,7 +2844,7 @@ public class StorageGroupProcessor { writeLock(); try { // abort ongoing comapctions and merges - CompactionMergeTaskPoolManager.getInstance().abortCompaction(logicalStorageGroupName); + CompactionTaskManager.getInstance().abortCompaction(logicalStorageGroupName); MergeManager.getINSTANCE().abortMerge(logicalStorageGroupName); // close all working files that should be removed removePartitions(filter, workSequenceTsFileProcessors.entrySet()); diff --git a/server/src/main/java/org/apache/iotdb/db/service/IoTDB.java b/server/src/main/java/org/apache/iotdb/db/service/IoTDB.java index 51a64ac..e4458ff 100644 --- a/server/src/main/java/org/apache/iotdb/db/service/IoTDB.java +++ b/server/src/main/java/org/apache/iotdb/db/service/IoTDB.java @@ -25,7 +25,7 @@ import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.cost.statistic.Measurement; import org.apache.iotdb.db.engine.StorageEngine; import org.apache.iotdb.db.engine.cache.CacheHitRatioMonitor; -import org.apache.iotdb.db.engine.compaction.CompactionMergeTaskPoolManager; +import org.apache.iotdb.db.engine.compaction.CompactionTaskManager; import org.apache.iotdb.db.engine.flush.FlushManager; import org.apache.iotdb.db.engine.merge.manage.MergeManager; import org.apache.iotdb.db.engine.trigger.service.TriggerRegistrationService; @@ -108,7 +108,7 @@ public class IoTDB implements IoTDBMBean { registerManager.register(TVListAllocator.getInstance()); registerManager.register(CacheHitRatioMonitor.getInstance()); registerManager.register(MergeManager.getINSTANCE()); - registerManager.register(CompactionMergeTaskPoolManager.getInstance()); + registerManager.register(CompactionTaskManager.getInstance()); JMXService.registerMBean(getInstance(), mbeanName); registerManager.register(StorageEngine.getInstance()); registerManager.register(TemporaryQueryDataFileService.getInstance()); diff --git a/server/src/test/java/org/apache/iotdb/db/engine/merge/MergeLogTest.java b/server/src/test/java/org/apache/iotdb/db/engine/merge/MergeLogTest.java index cf7441f..1c0dfeb 100644 --- a/server/src/test/java/org/apache/iotdb/db/engine/merge/MergeLogTest.java +++ b/server/src/test/java/org/apache/iotdb/db/engine/merge/MergeLogTest.java @@ -69,11 +69,10 @@ public class MergeLogTest extends MergeTest { new MergeResource(seqResources.subList(0, 1), unseqResources.subList(0, 1)), tempSGDir.getPath(), this::testCallBack, - "test", false, 1, MERGE_TEST_SG); - mergeTask.call(); + mergeTask.doMerge(); } private void testCallBack( diff --git a/server/src/test/java/org/apache/iotdb/db/engine/merge/MergeManagerTest.java b/server/src/test/java/org/apache/iotdb/db/engine/merge/MergeManagerTest.java index 9842f53..2232521 100644 --- a/server/src/test/java/org/apache/iotdb/db/engine/merge/MergeManagerTest.java +++ b/server/src/test/java/org/apache/iotdb/db/engine/merge/MergeManagerTest.java @@ -99,20 +99,11 @@ public class MergeManagerTest extends MergeTest { private String progress = "0"; public FakedMainMergeTask(int serialNum) { - super(null, null, null, null, false, 0, null); + super(null, null, null, false, 0, null); this.serialNum = serialNum; } @Override - public Void call() { - while (!Thread.currentThread().isInterrupted()) { - // wait until interrupt - } - progress = "1"; - return null; - } - - @Override public String getStorageGroupName() { return "test"; } diff --git a/server/src/test/java/org/apache/iotdb/db/engine/merge/MergeOverLapTest.java b/server/src/test/java/org/apache/iotdb/db/engine/merge/MergeOverLapTest.java index 6c70fe7..0684f46 100644 --- a/server/src/test/java/org/apache/iotdb/db/engine/merge/MergeOverLapTest.java +++ b/server/src/test/java/org/apache/iotdb/db/engine/merge/MergeOverLapTest.java @@ -185,11 +185,10 @@ public class MergeOverLapTest extends MergeTest { new MergeResource(seqResources, unseqResources), tempSGDir.getPath(), (k, v, l) -> {}, - "test", true, 1, MERGE_TEST_SG); - mergeTask.call(); + mergeTask.doMerge(); QueryContext context = new QueryContext(); PartialPath path = diff --git a/server/src/test/java/org/apache/iotdb/db/engine/merge/MergePerfTest.java b/server/src/test/java/org/apache/iotdb/db/engine/merge/MergePerfTest.java index ca816cf..84d4c5f 100644 --- a/server/src/test/java/org/apache/iotdb/db/engine/merge/MergePerfTest.java +++ b/server/src/test/java/org/apache/iotdb/db/engine/merge/MergePerfTest.java @@ -48,8 +48,8 @@ public class MergePerfTest extends MergeTest { resource.setCacheDeviceMeta(true); MergeTask mergeTask = new MergeTask( - resource, tempSGDir.getPath(), (k, v, l) -> {}, "test", fullMerge, 100, MERGE_TEST_SG); - mergeTask.call(); + resource, tempSGDir.getPath(), (k, v, l) -> {}, fullMerge, 100, MERGE_TEST_SG); + mergeTask.doMerge(); timeConsumption = System.currentTimeMillis() - timeConsumption; tearDown(); FileUtils.deleteDirectory(tempSGDir); diff --git a/server/src/test/java/org/apache/iotdb/db/engine/merge/MergeTaskTest.java b/server/src/test/java/org/apache/iotdb/db/engine/merge/MergeTaskTest.java index d4f5dfe..5f7c036 100644 --- a/server/src/test/java/org/apache/iotdb/db/engine/merge/MergeTaskTest.java +++ b/server/src/test/java/org/apache/iotdb/db/engine/merge/MergeTaskTest.java @@ -79,11 +79,10 @@ public class MergeTaskTest extends MergeTest { new MergeResource(seqResources, unseqResources), tempSGDir.getPath(), (k, v, l) -> {}, - "test", false, 1, MERGE_TEST_SG); - mergeTask.call(); + mergeTask.doMerge(); QueryContext context = new QueryContext(); PartialPath path = @@ -123,11 +122,10 @@ public class MergeTaskTest extends MergeTest { (k, v, l) -> { assertEquals(499, k.get(2).getEndTime("root.mergeTest.device1")); }, - "test", false, 1, MERGE_TEST_SG); - mergeTask.call(); + mergeTask.doMerge(); } @Test @@ -177,11 +175,10 @@ public class MergeTaskTest extends MergeTest { (k, v, l) -> { assertEquals(49, k.get(0).getEndTime("root.mergeTest.device1")); }, - "test", false, 1, MERGE_TEST_SG); - mergeTask.call(); + mergeTask.doMerge(); } @Test @@ -191,11 +188,10 @@ public class MergeTaskTest extends MergeTest { new MergeResource(seqResources, unseqResources), tempSGDir.getPath(), (k, v, l) -> {}, - "test", true, 1, MERGE_TEST_SG); - mergeTask.call(); + mergeTask.doMerge(); QueryContext context = new QueryContext(); PartialPath path = @@ -235,11 +231,10 @@ public class MergeTaskTest extends MergeTest { new MergeResource(seqResources, unseqResources), tempSGDir.getPath(), (k, v, l) -> {}, - "test", false, 1, MERGE_TEST_SG); - mergeTask.call(); + mergeTask.doMerge(); QueryContext context = new QueryContext(); PartialPath path = @@ -275,11 +270,10 @@ public class MergeTaskTest extends MergeTest { new MergeResource(seqResources, unseqResources.subList(0, 1)), tempSGDir.getPath(), (k, v, l) -> {}, - "test", false, 1, MERGE_TEST_SG); - mergeTask.call(); + mergeTask.doMerge(); QueryContext context = new QueryContext(); PartialPath path = @@ -319,11 +313,10 @@ public class MergeTaskTest extends MergeTest { new MergeResource(seqResources, unseqResources.subList(5, 6)), tempSGDir.getPath(), (k, v, l) -> {}, - "test", false, 1, MERGE_TEST_SG); - mergeTask.call(); + mergeTask.doMerge(); QueryContext context = new QueryContext(); PartialPath path = @@ -359,11 +352,10 @@ public class MergeTaskTest extends MergeTest { new MergeResource(seqResources, unseqResources.subList(0, 5)), tempSGDir.getPath(), (k, v, l) -> {}, - "test", false, 1, MERGE_TEST_SG); - mergeTask.call(); + mergeTask.doMerge(); QueryContext context = new QueryContext(); PartialPath path = @@ -424,11 +416,10 @@ public class MergeTaskTest extends MergeTest { e.printStackTrace(); } }, - "test", false, 1, MERGE_TEST_SG); - mergeTask.call(); + mergeTask.doMerge(); QueryContext context = new QueryContext(); PartialPath path = @@ -474,11 +465,10 @@ public class MergeTaskTest extends MergeTest { new MergeResource(testSeqResources, testUnseqResource), tempSGDir.getPath(), (k, v, l) -> {}, - "test", false, 1, MERGE_TEST_SG); - mergeTask.call(); + mergeTask.doMerge(); QueryContext context = new QueryContext(); PartialPath path = @@ -540,11 +530,10 @@ public class MergeTaskTest extends MergeTest { (k, v, l) -> { assertEquals(99, k.get(0).getEndTime("root.mergeTest.device1")); }, - "test", false, 1, MERGE_TEST_SG); - mergeTask.call(); + mergeTask.doMerge(); } private void prepareFileWithLastSensor( diff --git a/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessorTest.java b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessorTest.java index 3d7e2b6..8ad49c7 100644 --- a/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessorTest.java +++ b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessorTest.java @@ -610,7 +610,7 @@ public class StorageGroupProcessorTest { } processor.syncCloseAllWorkingTsFileProcessors(); - processor.merge(IoTDBDescriptor.getInstance().getConfig().isForceFullMerge()); + processor.merge(IoTDBDescriptor.getInstance().getConfig().isFullMerge()); while (processor.getTsFileManagement().isUnseqMerging) { // wait } diff --git a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBRestartIT.java b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBRestartIT.java index d01b9d9..289ce56 100644 --- a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBRestartIT.java +++ b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBRestartIT.java @@ -19,7 +19,7 @@ package org.apache.iotdb.db.integration; import org.apache.iotdb.db.engine.StorageEngine; -import org.apache.iotdb.db.engine.compaction.CompactionMergeTaskPoolManager; +import org.apache.iotdb.db.engine.compaction.CompactionTaskManager; import org.apache.iotdb.db.exception.StorageEngineException; import org.apache.iotdb.db.utils.EnvironmentUtils; import org.apache.iotdb.jdbc.Config; @@ -379,7 +379,7 @@ public class IoTDBRestartIT { } try { - CompactionMergeTaskPoolManager.getInstance().waitAllCompactionFinish(); + CompactionTaskManager.getInstance().waitAllCompactionFinish(); Thread.sleep(10000); EnvironmentUtils.restartDaemon(); } catch (Exception e) { diff --git a/server/src/test/java/org/apache/iotdb/db/utils/EnvironmentUtils.java b/server/src/test/java/org/apache/iotdb/db/utils/EnvironmentUtils.java index fc29451..9892780 100644 --- a/server/src/test/java/org/apache/iotdb/db/utils/EnvironmentUtils.java +++ b/server/src/test/java/org/apache/iotdb/db/utils/EnvironmentUtils.java @@ -27,7 +27,7 @@ import org.apache.iotdb.db.constant.TestConstant; import org.apache.iotdb.db.engine.StorageEngine; import org.apache.iotdb.db.engine.cache.ChunkCache; import org.apache.iotdb.db.engine.cache.TimeSeriesMetadataCache; -import org.apache.iotdb.db.engine.compaction.CompactionMergeTaskPoolManager; +import org.apache.iotdb.db.engine.compaction.CompactionTaskManager; import org.apache.iotdb.db.engine.trigger.service.TriggerRegistrationService; import org.apache.iotdb.db.exception.StorageEngineException; import org.apache.iotdb.db.exception.TriggerManagementException; @@ -86,7 +86,7 @@ public class EnvironmentUtils { public static void cleanEnv() throws IOException, StorageEngineException { // wait all compaction finished - CompactionMergeTaskPoolManager.getInstance().waitAllCompactionFinish(); + CompactionTaskManager.getInstance().waitAllCompactionFinish(); // deregister all user defined classes try {
