This is an automated email from the ASF dual-hosted git repository. sunzesong pushed a commit to branch pr_1758 in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 6452379f5c1f0e5c7e14325e46a4324007519115 Author: 张凌哲 <[email protected]> AuthorDate: Sun Oct 4 17:42:20 2020 +0800 add unseq merge config and logic with instance unseq merge --- .../resources/conf/iotdb-engine.properties | 16 +- .../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 35 +- .../org/apache/iotdb/db/conf/IoTDBDescriptor.java | 10 + .../engine/storagegroup/StorageGroupProcessor.java | 426 +++++---------------- .../engine/tsfilemanagement/TsFileManagement.java | 230 +++++++++++ .../level/LevelTsFileManagement.java | 230 +++++------ .../iotdb/db/engine/merge/MergeManagerTest.java | 4 +- .../storagegroup/StorageGroupProcessorTest.java | 327 +--------------- .../iotdb/db/integration/IoTDBMergeTest.java | 2 +- 9 files changed, 527 insertions(+), 753 deletions(-) diff --git a/server/src/assembly/resources/conf/iotdb-engine.properties b/server/src/assembly/resources/conf/iotdb-engine.properties index 2e6577b..97ded95 100644 --- a/server/src/assembly/resources/conf/iotdb-engine.properties +++ b/server/src/assembly/resources/conf/iotdb-engine.properties @@ -260,12 +260,20 @@ default_fill_interval=-1 tsfile_manage_strategy=NORMAL_STRATEGY # Work when tsfile_manage_strategy is level_strategy. -# The max file num of each level. When file num exceeds this, the files in one level will merge to one. -max_file_num_in_each_level=100 +# The max seq file num of each level. When file num exceeds this, the files in one level will merge to one. +max_file_num_in_each_level=10 # Work when tsfile_manage_strategy is level_strategy. -# The max num of level. -max_level_num=2 +# The max num of seq level. +max_level_num=4 + +# Work when tsfile_manage_strategy is level_strategy. +# The max unseq file num of each level. When file num exceeds this, the files in one level will merge to one. +max_unseq_file_num_in_each_level=10 + +# Work when tsfile_manage_strategy is level_strategy. +# The max num of unseq level. +max_unseq_level_num=2 # Work when tsfile_manage_strategy is level_strategy. # When merge point number reaches this, merge the files to the last level. diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java index 54cf9e3..5322c07 100644 --- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java +++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java @@ -266,15 +266,26 @@ public class IoTDBConfig { private TsFileManagementStrategy tsFileManagementStrategy = TsFileManagementStrategy.NORMAL_STRATEGY; /** - * Work when tsfile_manage_strategy is level_strategy. The max file num of each level. When file + * Work when tsfile_manage_strategy is level_strategy. The max seq file num of each level. When file * num exceeds this, the files in one level will merge to one. */ - private int maxFileNumInEachLevel = 100; + private int maxFileNumInEachLevel = 10; /** - * Work when tsfile_manage_strategy is level_strategy. The max num of level. + * Work when tsfile_manage_strategy is level_strategy. The max num of seq level. */ - private int maxLevelNum = 2; + private int maxLevelNum = 4; + + /** + * Work when tsfile_manage_strategy is level_strategy. The max unseq file num of each level. When file + * num exceeds this, the files in one level will merge to one. + */ + private int maxUnseqFileNumInEachLevel = 10; + + /** + * Work when tsfile_manage_strategy is level_strategy. The max num of unseq level. + */ + private int maxUnseqLevelNum = 2; /** * whether to cache meta data(ChunkMetaData and TsFileMetaData) or not. @@ -1308,6 +1319,22 @@ public class IoTDBConfig { this.maxLevelNum = maxLevelNum; } + public int getMaxUnseqFileNumInEachLevel() { + return maxUnseqFileNumInEachLevel; + } + + public void setMaxUnseqFileNumInEachLevel(int maxUnseqFileNumInEachLevel) { + this.maxUnseqFileNumInEachLevel = maxUnseqFileNumInEachLevel; + } + + public int getMaxUnseqLevelNum() { + return maxUnseqLevelNum; + } + + public void setMaxUnseqLevelNum(int maxUnseqLevelNum) { + this.maxUnseqLevelNum = maxUnseqLevelNum; + } + public int getMergeChunkSubThreadNum() { return mergeChunkSubThreadNum; } diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java index fbeccfd..b77f4d0 100644 --- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java +++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java @@ -287,6 +287,14 @@ public class IoTDBDescriptor { .getProperty("max_file_num_in_each_level", Integer.toString(conf.getMaxFileNumInEachLevel())))); + conf.setMaxUnseqLevelNum(Integer.parseInt(properties + .getProperty("max_unseq_level_num", + Integer.toString(conf.getMaxUnseqLevelNum())))); + + conf.setMaxUnseqFileNumInEachLevel(Integer.parseInt(properties + .getProperty("max_unseq_file_num_in_each_level", + Integer.toString(conf.getMaxUnseqFileNumInEachLevel())))); + conf.setSyncEnable(Boolean .parseBoolean(properties.getProperty("is_sync_enable", Boolean.toString(conf.isSyncEnable())))); @@ -357,6 +365,8 @@ public class IoTDBDescriptor { Boolean.toString(conf.isForceFullMerge())))); conf.setChunkMergePointThreshold(Integer.parseInt(properties.getProperty( "chunk_merge_point_threshold", Integer.toString(conf.getChunkMergePointThreshold())))); + conf.setMergeThroughputMbPerSec(Integer.parseInt(properties.getProperty( + "merge_throughput_mb_per_sec", Integer.toString(conf.getMergeThroughputMbPerSec())))); conf.setEnablePartialInsert( Boolean.parseBoolean(properties.getProperty("enable_partial_insert", diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java index bf09d1b..5038d00 100755 --- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java @@ -49,15 +49,8 @@ import org.apache.iotdb.db.engine.StorageEngine; import org.apache.iotdb.db.engine.fileSystem.SystemFileFactory; import org.apache.iotdb.db.engine.flush.TsFileFlushPolicy; import org.apache.iotdb.db.engine.merge.manage.MergeManager; -import org.apache.iotdb.db.engine.merge.manage.MergeResource; -import org.apache.iotdb.db.engine.merge.selector.IMergeFileSelector; -import org.apache.iotdb.db.engine.merge.selector.MaxFileMergeFileSelector; -import org.apache.iotdb.db.engine.merge.selector.MaxSeriesMergeFileSelector; -import org.apache.iotdb.db.engine.merge.selector.MergeFileStrategy; -import org.apache.iotdb.db.engine.merge.task.MergeTask; import org.apache.iotdb.db.engine.merge.task.RecoverMergeTask; import org.apache.iotdb.db.engine.modification.Deletion; -import org.apache.iotdb.db.engine.modification.Modification; import org.apache.iotdb.db.engine.modification.ModificationFile; import org.apache.iotdb.db.engine.querycontext.QueryDataSource; import org.apache.iotdb.db.engine.tsfilemanagement.HotCompactionMergeTaskPoolManager; @@ -67,7 +60,6 @@ import org.apache.iotdb.db.engine.version.VersionController; import org.apache.iotdb.db.exception.BatchInsertionException; import org.apache.iotdb.db.exception.DiskSpaceInsufficientException; import org.apache.iotdb.db.exception.LoadFileException; -import org.apache.iotdb.db.exception.MergeException; import org.apache.iotdb.db.exception.StorageGroupProcessorException; import org.apache.iotdb.db.exception.TsFileProcessorException; import org.apache.iotdb.db.exception.WriteProcessException; @@ -123,8 +115,7 @@ import org.slf4j.LoggerFactory; */ public class StorageGroupProcessor { - private static final String MERGING_MODIFICATION_FILE_NAME = "merge.mods"; - private static final String FAIL_TO_UPGRADE_FOLDER = "Failed to move {} to upgrade folder"; + public static final String MERGING_MODIFICATION_FILE_NAME = "merge.mods"; /** * All newly generated chunks after merge have version number 0, so we set merged Modification @@ -154,6 +145,11 @@ public class StorageGroupProcessor { */ private final Object closeStorageGroupCondition = new Object(); /** + * hotCompactionMergeWorking is used to wait for last hot compaction to be done. + */ + private volatile boolean hotCompactionMergeWorking = false; + + /** * avoid some tsfileResource is changed (e.g., from unsealed to sealed) when a query is executed. */ private final ReadWriteLock closeQueryLock = new ReentrantReadWriteLock(); @@ -165,10 +161,7 @@ public class StorageGroupProcessor { * time partition id in the storage group -> tsFileProcessor for this time partition */ private final TreeMap<Long, TsFileProcessor> workUnsequenceTsFileProcessors = new TreeMap<>(); - /** - * hotCompactionMergeWorking is used to wait for last hot compaction to be done. - */ - private volatile boolean hotCompactionMergeWorking = false; + // upgrading sequence TsFile resource list private List<TsFileResource> upgradeSeqFileList = new LinkedList<>(); @@ -208,7 +201,7 @@ public class StorageGroupProcessor { private File storageGroupSysDir; // manage seqFileList and unSeqFileList - private TsFileManagement tsFileManagement; + public TsFileManagement tsFileManagement; /** * time partition id -> version controller which assigns a version for each MemTable and @@ -216,18 +209,7 @@ public class StorageGroupProcessor { * updates can be re-determined. */ private HashMap<Long, VersionController> timePartitionIdVersionControllerMap = new HashMap<>(); - /** - * mergeLock is to be used in the merge process. Concurrent queries, deletions and merges may - * result in losing some deletion in the merged new file, so a lock is necessary. - */ - private ReentrantReadWriteLock mergeLock = new ReentrantReadWriteLock(); - /** - * This is the modification file of the result of the current merge. Because the merged file may - * be invisible at this moment, without this, deletion/update during merge could be lost. - */ - private ModificationFile mergingModification; - private volatile boolean isMerging = false; - private long mergeStartTime; + /** * when the data in a storage group is older than dataTTL, it is considered invalid and will be * eventually removed. @@ -329,11 +311,11 @@ public class StorageGroupProcessor { File mergingMods = SystemFileFactory.INSTANCE.getFile(storageGroupSysDir, MERGING_MODIFICATION_FILE_NAME); if (mergingMods.exists()) { - mergingModification = new ModificationFile(mergingMods.getPath()); + this.tsFileManagement.mergingModification = new ModificationFile(mergingMods.getPath()); } RecoverMergeTask recoverMergeTask = new RecoverMergeTask( new ArrayList<>(tsFileManagement.getTsFileList(true)), - tsFileManagement.getTsFileList(false), storageGroupSysDir.getPath(), this::mergeEndAction, + tsFileManagement.getTsFileList(false), storageGroupSysDir.getPath(), tsFileManagement::mergeEndAction, taskName, IoTDBDescriptor.getInstance().getConfig().isForceFullMerge(), storageGroupName); logger.info("{} a RecoverMergeTask {} starts...", storageGroupName, taskName); @@ -476,19 +458,19 @@ public class StorageGroupProcessor { // move .tsfile to upgrade folder for (File file : oldTsfileArray) { if (!file.renameTo(fsFactory.getFile(upgradeFolder, file.getName()))) { - logger.error(FAIL_TO_UPGRADE_FOLDER, file); + logger.error("Failed to move {} to upgrade folder", file); } } // move .resource to upgrade folder for (File file : oldResourceFileArray) { if (!file.renameTo(fsFactory.getFile(upgradeFolder, file.getName()))) { - logger.error(FAIL_TO_UPGRADE_FOLDER, file); + logger.error("Failed to move {} to upgrade folder", file); } } // move .mods to upgrade folder for (File file : oldModificationFileArray) { if (!file.renameTo(fsFactory.getFile(upgradeFolder, file.getName()))) { - logger.error(FAIL_TO_UPGRADE_FOLDER, file); + logger.error("Failed to move {} to upgrade folder", file); } } @@ -624,22 +606,14 @@ public class StorageGroupProcessor { // init map long timePartitionId = StorageEngine.getTimePartition(insertRowPlan.getTime()); + latestTimeForEachDevice.computeIfAbsent(timePartitionId, l -> new HashMap<>()); partitionLatestFlushedTimeForEachDevice .computeIfAbsent(timePartitionId, id -> new HashMap<>()); - boolean isSequence = - insertRowPlan.getTime() > partitionLatestFlushedTimeForEachDevice.get(timePartitionId) - .getOrDefault(insertRowPlan.getDeviceId().getFullPath(), Long.MIN_VALUE); - - //is unsequence and user set config to discard out of order data - if (!isSequence && IoTDBDescriptor.getInstance().getConfig() - .isEnableDiscardOutOfOrderData()) { - return; - } - - latestTimeForEachDevice.computeIfAbsent(timePartitionId, l -> new HashMap<>()); // insert to sequence or unSequence file - insertToTsFileProcessor(insertRowPlan, isSequence); + insertToTsFileProcessor(insertRowPlan, + insertRowPlan.getTime() > partitionLatestFlushedTimeForEachDevice.get(timePartitionId) + .getOrDefault(insertRowPlan.getDeviceId().getFullPath(), Long.MIN_VALUE)); } finally { writeUnlock(); @@ -696,12 +670,9 @@ public class StorageGroupProcessor { // start next partition if (curTimePartition != beforeTimePartition) { // insert last time partition - if (isSequence || !IoTDBDescriptor.getInstance().getConfig() - .isEnableDiscardOutOfOrderData()) { - noFailure = insertTabletToTsFileProcessor(insertTabletPlan, before, loc, isSequence, - results, - beforeTimePartition) && noFailure; - } + noFailure = insertTabletToTsFileProcessor(insertTabletPlan, before, loc, isSequence, + results, + beforeTimePartition) && noFailure; // re initialize before = loc; beforeTimePartition = curTimePartition; @@ -715,11 +686,8 @@ public class StorageGroupProcessor { // judge if we should insert sequence if (!isSequence && time > lastFlushTime) { // insert into unsequence and then start sequence - if (!IoTDBDescriptor.getInstance().getConfig().isEnableDiscardOutOfOrderData()) { - noFailure = - insertTabletToTsFileProcessor(insertTabletPlan, before, loc, false, results, - beforeTimePartition) && noFailure; - } + noFailure = insertTabletToTsFileProcessor(insertTabletPlan, before, loc, false, results, + beforeTimePartition) && noFailure; before = loc; isSequence = true; } @@ -728,8 +696,7 @@ public class StorageGroupProcessor { } // do not forget last part - if (before < loc && (isSequence || !IoTDBDescriptor.getInstance().getConfig() - .isEnableDiscardOutOfOrderData())) { + if (before < loc) { noFailure = insertTabletToTsFileProcessor(insertTabletPlan, before, loc, isSequence, results, beforeTimePartition) && noFailure; } @@ -757,11 +724,11 @@ public class StorageGroupProcessor { * inserted are in the range [start, end) * * @param insertTabletPlan insert a tablet of a device - * @param sequence whether is sequence - * @param start start index of rows to be inserted in insertTabletPlan - * @param end end index of rows to be inserted in insertTabletPlan - * @param results result array - * @param timePartitionId time partition id + * @param sequence whether is sequence + * @param start start index of rows to be inserted in insertTabletPlan + * @param end end index of rows to be inserted in insertTabletPlan + * @param results result array + * @param timePartitionId time partition id * @return false if any failure occurs when inserting the tablet, true otherwise */ private boolean insertTabletToTsFileProcessor(InsertTabletPlan insertTabletPlan, @@ -804,9 +771,6 @@ public class StorageGroupProcessor { } private void tryToUpdateBatchInsertLastCache(InsertTabletPlan plan, Long latestFlushedTime) { - if (!IoTDBDescriptor.getInstance().getConfig().isLastCacheEnabled()) { - return; - } MeasurementMNode[] mNodes = plan.getMeasurementMNodes(); for (int i = 0; i < mNodes.length; i++) { if (plan.getColumns()[i] == null) { @@ -859,9 +823,6 @@ public class StorageGroupProcessor { } private void tryToUpdateInsertLastCache(InsertRowPlan plan, Long latestFlushedTime) { - if (!IoTDBDescriptor.getInstance().getConfig().isLastCacheEnabled()) { - return; - } MeasurementMNode[] mNodes = plan.getMeasurementMNodes(); for (int i = 0; i < mNodes.length; i++) { if (plan.getValues()[i] == null) { @@ -907,9 +868,9 @@ public class StorageGroupProcessor { /** * get processor from hashmap, flush oldest processor if necessary * - * @param timeRangeId time partition range + * @param timeRangeId time partition range * @param tsFileProcessorTreeMap tsFileProcessorTreeMap - * @param sequence whether is sequence or not + * @param sequence whether is sequence or not */ private TsFileProcessor getOrCreateTsFileProcessorIntern(long timeRangeId, TreeMap<Long, TsFileProcessor> tsFileProcessorTreeMap, @@ -1074,11 +1035,12 @@ public class StorageGroupProcessor { syncCloseAllWorkingTsFileProcessors(); //normally, mergingModification is just need to be closed by after a merge task is finished. //we close it here just for IT test. - if (this.mergingModification != null) { + if (this.tsFileManagement.mergingModification != null) { try { - mergingModification.close(); + this.tsFileManagement.mergingModification.close(); } catch (IOException e) { - logger.error("Cannot close the mergingMod file {}", mergingModification.getFilePath(), e); + logger.error("Cannot close the mergingMod file {}", + this.tsFileManagement.mergingModification.getFilePath(), e); } } @@ -1246,7 +1208,7 @@ public class StorageGroupProcessor { public QueryDataSource query(PartialPath deviceId, String measurementId, QueryContext context, QueryFileManager filePathsManager, Filter timeFilter) throws QueryProcessException { insertLock.readLock().lock(); - mergeLock.readLock().lock(); + tsFileManagement.mergeLock.readLock().lock(); tsFileManagement.readLock(); try { List<TsFileResource> seqResources = getFileResourceListForQuery( @@ -1269,7 +1231,7 @@ public class StorageGroupProcessor { throw new QueryProcessException(e); } finally { tsFileManagement.readUnLock(); - mergeLock.readLock().unlock(); + tsFileManagement.mergeLock.readLock().unlock(); insertLock.readLock().unlock(); } } @@ -1364,10 +1326,10 @@ public class StorageGroupProcessor { * Delete data whose timestamp <= 'timestamp' and belongs to the time series * deviceId.measurementId. * - * @param deviceId the deviceId of the timeseries to be deleted. + * @param deviceId the deviceId of the timeseries to be deleted. * @param measurementId the measurementId of the timeseries to be deleted. - * @param startTime the startTime of delete range. - * @param endTime the endTime of delete range. + * @param startTime the startTime of delete range. + * @param endTime the endTime of delete range. */ public void delete(PartialPath deviceId, String measurementId, long startTime, long endTime) throws IOException { @@ -1375,7 +1337,7 @@ public class StorageGroupProcessor { // FIXME: notice that if we may remove a SGProcessor out of memory, we need to close all opened //mod files in mergingModification, sequenceFileList, and unsequenceFileList writeLock(); - mergeLock.writeLock().lock(); + tsFileManagement.mergeLock.writeLock().lock(); tsFileManagement.writeLock(); // record files which are updated so that we can roll back them in case of exception @@ -1402,9 +1364,9 @@ public class StorageGroupProcessor { tryToDeleteLastCache(deviceId, measurementId, startTime, endTime); Deletion deletion = new Deletion(deviceId.concatNode(measurementId), MERGE_MOD_START_VERSION_NUM, startTime, endTime); - if (mergingModification != null) { - mergingModification.write(deletion); - updatedModFiles.add(mergingModification); + if (tsFileManagement.mergingModification != null) { + tsFileManagement.mergingModification.write(deletion); + updatedModFiles.add(tsFileManagement.mergingModification); } deleteDataInFiles(tsFileManagement.getTsFileList(true), deletion, updatedModFiles); @@ -1418,7 +1380,7 @@ public class StorageGroupProcessor { throw new IOException(e); } finally { tsFileManagement.writeUnlock(); - mergeLock.writeLock().unlock(); + tsFileManagement.mergeLock.writeLock().unlock(); writeUnlock(); } } @@ -1477,11 +1439,12 @@ public class StorageGroupProcessor { private void tryToDeleteLastCache(PartialPath deviceId, String measurementId, long startTime, long endTime) throws WriteProcessException { + MNode node = null; try { MManager manager = MManager.getInstance(); - MNode node = manager.getDeviceNodeWithAutoCreate(deviceId); + node = manager.getDeviceNodeWithAutoCreateAndReadLock(deviceId); - MNode measurementNode = node.getChild(measurementId); + MNode measurementNode = manager.getChild(node, measurementId); if (measurementNode != null) { TimeValuePair lastPair = ((MeasurementMNode) measurementNode).getCachedLast(); if (lastPair != null && startTime <= lastPair.getTimestamp() @@ -1491,6 +1454,10 @@ public class StorageGroupProcessor { } } catch (MetadataException e) { throw new WriteProcessException(e); + } finally { + if (node != null) { + node.readUnlock(); + } } } @@ -1634,7 +1601,7 @@ public class StorageGroupProcessor { ); } insertLock.writeLock().lock(); - mergeLock.writeLock().lock(); + tsFileManagement.mergeLock.writeLock().lock(); tsFileManagement.writeLock(); if (tsFileResource.isSeq()) { tsFileManagement.addAll(upgradedResources, true); @@ -1644,7 +1611,7 @@ public class StorageGroupProcessor { upgradeUnseqFileList.remove(tsFileResource); } tsFileManagement.writeUnlock(); - mergeLock.writeLock().unlock(); + tsFileManagement.mergeLock.writeLock().unlock(); insertLock.writeLock().unlock(); // after upgrade complete, update partitionLatestFlushedTimeForEachDevice @@ -1672,206 +1639,13 @@ public class StorageGroupProcessor { public void merge(boolean fullMerge) { writeLock(); try { - if (isMerging) { - if (logger.isInfoEnabled()) { - logger.info("{} Last merge is ongoing, currently consumed time: {}ms", storageGroupName, - (System.currentTimeMillis() - mergeStartTime)); - } - return; - } - logger.info("{} will close all files for starting a merge (fullmerge = {})", storageGroupName, - fullMerge); - - List<TsFileResource> seqMergeList = tsFileManagement.getStableTsFileList(true); - List<TsFileResource> unSeqMergeList = tsFileManagement.getStableTsFileList(false); - if (seqMergeList.isEmpty() || unSeqMergeList.isEmpty()) { - logger.info("{} no files to be merged", storageGroupName); - return; - } - - long budget = IoTDBDescriptor.getInstance().getConfig().getMergeMemoryBudget(); - long timeLowerBound = System.currentTimeMillis() - dataTTL; - MergeResource mergeResource = new MergeResource(seqMergeList, unSeqMergeList, timeLowerBound); - - IMergeFileSelector fileSelector = getMergeFileSelector(budget, mergeResource); - try { - List[] mergeFiles = fileSelector.select(); - if (mergeFiles.length == 0) { - logger.info("{} cannot select merge candidates under the budget {}", storageGroupName, - budget); - return; - } - // avoid pending tasks holds the metadata and streams - mergeResource.clear(); - String taskName = storageGroupName + "-" + System.currentTimeMillis(); - // do not cache metadata until true candidates are chosen, or too much metadata will be - // cached during selection - mergeResource.setCacheDeviceMeta(true); - - for (TsFileResource tsFileResource : mergeResource.getSeqFiles()) { - tsFileResource.setMerging(true); - } - for (TsFileResource tsFileResource : mergeResource.getUnseqFiles()) { - tsFileResource.setMerging(true); - } - - MergeTask mergeTask = new MergeTask(mergeResource, storageGroupSysDir.getPath(), - this::mergeEndAction, taskName, fullMerge, fileSelector.getConcurrentMergeNum(), - storageGroupName); - mergingModification = new ModificationFile( - storageGroupSysDir + File.separator + MERGING_MODIFICATION_FILE_NAME); - MergeManager.getINSTANCE().submitMainTask(mergeTask); - if (logger.isInfoEnabled()) { - logger.info("{} submits a merge task {}, merging {} seqFiles, {} unseqFiles", - storageGroupName, taskName, mergeFiles[0].size(), mergeFiles[1].size()); - } - isMerging = true; - mergeStartTime = System.currentTimeMillis(); - - } catch (MergeException | IOException e) { - logger.error("{} cannot select file for merge", storageGroupName, e); - } + this.tsFileManagement.merge(fullMerge, tsFileManagement.getTsFileList(true), + tsFileManagement.getTsFileList(false),dataTTL); } finally { writeUnlock(); } } - private IMergeFileSelector getMergeFileSelector(long budget, MergeResource resource) { - MergeFileStrategy strategy = IoTDBDescriptor.getInstance().getConfig().getMergeFileStrategy(); - switch (strategy) { - case MAX_FILE_NUM: - return new MaxFileMergeFileSelector(resource, budget); - case MAX_SERIES_NUM: - return new MaxSeriesMergeFileSelector(resource, budget); - default: - throw new UnsupportedOperationException("Unknown MergeFileStrategy " + strategy); - } - } - - private void removeUnseqFiles(List<TsFileResource> unseqFiles) { - mergeLock.writeLock().lock(); - tsFileManagement.writeLock(); - try { - tsFileManagement.removeAll(unseqFiles, false); - } finally { - tsFileManagement.writeUnlock(); - mergeLock.writeLock().unlock(); - } - - for (TsFileResource unseqFile : unseqFiles) { - unseqFile.writeLock(); - try { - unseqFile.remove(); - } finally { - unseqFile.writeUnlock(); - } - } - } - - @SuppressWarnings("squid:S1141") - private void updateMergeModification(TsFileResource seqFile) { - try { - // remove old modifications and write modifications generated during merge - seqFile.removeModFile(); - if (mergingModification != null) { - for (Modification modification : mergingModification.getModifications()) { - seqFile.getModFile().write(modification); - } - try { - seqFile.getModFile().close(); - } catch (IOException e) { - logger - .error("Cannot close the ModificationFile {}", seqFile.getModFile().getFilePath(), e); - } - } - } catch (IOException e) { - logger.error("{} cannot clean the ModificationFile of {} after merge", storageGroupName, - seqFile.getTsFile(), e); - } - } - - private void removeMergingModification() { - try { - if (mergingModification != null) { - mergingModification.remove(); - mergingModification = null; - } - } catch (IOException e) { - logger.error("{} cannot remove merging modification ", storageGroupName, e); - } - } - - protected void mergeEndAction(List<TsFileResource> seqFiles, List<TsFileResource> unseqFiles, - File mergeLog) { - logger.info("{} a merge task is ending...", storageGroupName); - - if (unseqFiles.isEmpty()) { - // merge runtime exception arose, just end this merge - isMerging = false; - logger.info("{} a merge task abnormally ends", storageGroupName); - return; - } - - removeUnseqFiles(unseqFiles); - - for (int i = 0; i < seqFiles.size(); i++) { - TsFileResource seqFile = seqFiles.get(i); - // get both seqFile lock and merge lock - doubleWriteLock(seqFile); - - try { - updateMergeModification(seqFile); - if (i == seqFiles.size() - 1) { - //FIXME if there is an exception, the the modification file will be not closed. - removeMergingModification(); - isMerging = false; - mergeLog.delete(); - } - } finally { - doubleWriteUnlock(seqFile); - } - } - logger.info("{} a merge task ends", storageGroupName); - } - - /** - * acquire the write locks of the resource , the merge lock and the hot compaction lock - */ - private void doubleWriteLock(TsFileResource seqFile) { - boolean fileLockGot; - boolean mergeLockGot; - boolean hotCompactionLockGot; - while (true) { - fileLockGot = seqFile.tryWriteLock(); - mergeLockGot = mergeLock.writeLock().tryLock(); - hotCompactionLockGot = tsFileManagement.tryWriteLock(); - - if (fileLockGot && mergeLockGot && hotCompactionLockGot) { - break; - } else { - // did not get all of them, release the gotten one and retry - if (hotCompactionLockGot) { - tsFileManagement.writeUnlock(); - } - if (mergeLockGot) { - mergeLock.writeLock().unlock(); - } - if (fileLockGot) { - seqFile.writeUnlock(); - } - } - } - } - - /** - * release the write locks of the resource , the merge lock and the hot compaction lock - */ - private void doubleWriteUnlock(TsFileResource seqFile) { - tsFileManagement.writeUnlock(); - mergeLock.writeLock().unlock(); - seqFile.writeUnlock(); - } - /** * Load a new tsfile to storage group processor. Tne file may have overlap with other files. * <p> @@ -1888,7 +1662,7 @@ public class StorageGroupProcessor { File tsfileToBeInserted = newTsFileResource.getTsFile(); long newFilePartitionId = newTsFileResource.getTimePartitionWithCheck(); writeLock(); - mergeLock.writeLock().lock(); + tsFileManagement.mergeLock.writeLock().lock(); tsFileManagement.writeLock(); try { if (loadTsFileByType(LoadTsFileType.LOAD_SEQUENCE, tsfileToBeInserted, newTsFileResource, @@ -1903,7 +1677,7 @@ public class StorageGroupProcessor { throw new LoadFileException(e); } finally { tsFileManagement.writeUnlock(); - mergeLock.writeLock().unlock(); + tsFileManagement.mergeLock.writeLock().unlock(); writeUnlock(); } } @@ -1926,7 +1700,7 @@ public class StorageGroupProcessor { File tsfileToBeInserted = newTsFileResource.getTsFile(); long newFilePartitionId = newTsFileResource.getTimePartitionWithCheck(); writeLock(); - mergeLock.writeLock().lock(); + tsFileManagement.mergeLock.writeLock().lock(); tsFileManagement.writeLock(); try { List<TsFileResource> sequenceList = tsFileManagement.getTsFileList(true); @@ -1972,7 +1746,7 @@ public class StorageGroupProcessor { throw new LoadFileException(e); } finally { tsFileManagement.writeUnlock(); - mergeLock.writeLock().unlock(); + tsFileManagement.mergeLock.writeLock().unlock(); writeUnlock(); } } @@ -2165,9 +1939,9 @@ public class StorageGroupProcessor { * returns directly; otherwise, the time stamp is the mean of the timestamps of the two files, the * version number is the version number in the tsfile with a larger timestamp. * - * @param tsfileName origin tsfile name + * @param tsfileName origin tsfile name * @param insertIndex the new file will be inserted between the files [insertIndex, insertIndex + - * 1] + * 1] * @return appropriate filename */ private String getFileNameForLoadingFile(String tsfileName, int insertIndex, @@ -2231,8 +2005,8 @@ public class StorageGroupProcessor { /** * Execute the loading process by the type. * - * @param type load type - * @param tsFileResource tsfile resource to be loaded + * @param type load type + * @param tsFileResource tsfile resource to be loaded * @param filePartitionId the partition id of the new file * @return load the file successfully * @UsedBy sync module, load external tsfile module. @@ -2323,7 +2097,7 @@ public class StorageGroupProcessor { */ public boolean deleteTsfile(File tsfieToBeDeleted) { writeLock(); - mergeLock.writeLock().lock(); + tsFileManagement.mergeLock.writeLock().lock(); tsFileManagement.writeLock(); TsFileResource tsFileResourceToBeDeleted = null; try { @@ -2349,7 +2123,7 @@ public class StorageGroupProcessor { } } finally { tsFileManagement.writeUnlock(); - mergeLock.writeLock().unlock(); + tsFileManagement.mergeLock.writeLock().unlock(); writeUnlock(); } if (tsFileResourceToBeDeleted == null) { @@ -2383,7 +2157,7 @@ public class StorageGroupProcessor { */ public boolean moveTsfile(File fileToBeMoved, File targetDir) { writeLock(); - mergeLock.writeLock().lock(); + tsFileManagement.mergeLock.writeLock().lock(); tsFileManagement.writeLock(); TsFileResource tsFileResourceToBeMoved = null; try { @@ -2409,7 +2183,7 @@ public class StorageGroupProcessor { } } finally { tsFileManagement.writeUnlock(); - mergeLock.writeLock().unlock(); + tsFileManagement.mergeLock.writeLock().unlock(); writeUnlock(); } if (tsFileResourceToBeMoved == null) { @@ -2486,13 +2260,41 @@ public class StorageGroupProcessor { return partitionFileVersions.containsAll(tsFileResource.getHistoricalVersions()); } + private enum LoadTsFileType { + LOAD_SEQUENCE, LOAD_UNSEQUENCE + } + + @FunctionalInterface + public interface CloseTsFileCallBack { + + void call(TsFileProcessor caller) throws TsFileProcessorException, IOException; + } + + @FunctionalInterface + public interface UpdateEndTimeCallBack { + + boolean call(TsFileProcessor caller); + } + + @FunctionalInterface + public interface UpgradeTsFileResourceCallBack { + + void call(TsFileResource caller); + } + + @FunctionalInterface + public interface CloseHotCompactionMergeCallBack { + + void call(); + } + /** * remove all partitions that satisfy a filter. */ public void removePartitions(TimePartitionFilter filter) { // this requires blocking all other activities insertLock.writeLock().lock(); - mergeLock.writeLock().lock(); + tsFileManagement.mergeLock.writeLock().lock(); try { // abort ongoing merges MergeManager.getINSTANCE().abortMerge(storageGroupName); @@ -2506,7 +2308,7 @@ public class StorageGroupProcessor { } finally { insertLock.writeLock().unlock(); - mergeLock.writeLock().unlock(); + tsFileManagement.mergeLock.writeLock().unlock(); } } @@ -2536,38 +2338,6 @@ public class StorageGroupProcessor { } } - public boolean isHotCompactionMergeWorking() { - return hotCompactionMergeWorking; - } - - private enum LoadTsFileType { - LOAD_SEQUENCE, LOAD_UNSEQUENCE - } - - @FunctionalInterface - public interface CloseTsFileCallBack { - - void call(TsFileProcessor caller) throws TsFileProcessorException, IOException; - } - - @FunctionalInterface - public interface UpdateEndTimeCallBack { - - boolean call(TsFileProcessor caller); - } - - @FunctionalInterface - public interface UpgradeTsFileResourceCallBack { - - void call(TsFileResource caller); - } - - @FunctionalInterface - public interface CloseHotCompactionMergeCallBack { - - void call(); - } - @FunctionalInterface public interface TimePartitionFilter { diff --git a/server/src/main/java/org/apache/iotdb/db/engine/tsfilemanagement/TsFileManagement.java b/server/src/main/java/org/apache/iotdb/db/engine/tsfilemanagement/TsFileManagement.java index dcb2aaa..a59bee4 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/tsfilemanagement/TsFileManagement.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/tsfilemanagement/TsFileManagement.java @@ -19,24 +19,56 @@ package org.apache.iotdb.db.engine.tsfilemanagement; +import static org.apache.iotdb.db.engine.storagegroup.StorageGroupProcessor.MERGING_MODIFICATION_FILE_NAME; + +import java.io.File; import java.io.IOException; import java.util.Iterator; import java.util.List; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; +import org.apache.iotdb.db.conf.IoTDBDescriptor; +import org.apache.iotdb.db.engine.merge.manage.MergeManager; +import org.apache.iotdb.db.engine.merge.manage.MergeResource; +import org.apache.iotdb.db.engine.merge.selector.IMergeFileSelector; +import org.apache.iotdb.db.engine.merge.selector.MaxFileMergeFileSelector; +import org.apache.iotdb.db.engine.merge.selector.MaxSeriesMergeFileSelector; +import org.apache.iotdb.db.engine.merge.selector.MergeFileStrategy; +import org.apache.iotdb.db.engine.merge.task.MergeTask; +import org.apache.iotdb.db.engine.modification.Modification; +import org.apache.iotdb.db.engine.modification.ModificationFile; import org.apache.iotdb.db.engine.storagegroup.StorageGroupProcessor.CloseHotCompactionMergeCallBack; import org.apache.iotdb.db.engine.storagegroup.TsFileResource; +import org.apache.iotdb.db.engine.tsfilemanagement.level.LevelTsFileManagement; +import org.apache.iotdb.db.exception.MergeException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public abstract class TsFileManagement { + private static final Logger logger = LoggerFactory.getLogger(TsFileManagement.class); protected String storageGroupName; protected String storageGroupDir; + + /** + * mergeLock is to be used in the merge process. Concurrent queries, deletions and merges may + * result in losing some deletion in the merged new file, so a lock is necessary. + */ + public ReentrantReadWriteLock mergeLock = new ReentrantReadWriteLock(); /** * hotCompactionMergeLock is used to wait for TsFile list change in hot compaction merge * processor. */ private final ReadWriteLock hotCompactionMergeLock = new ReentrantReadWriteLock(); + public volatile boolean isUnseqMerging = false; + /** + * This is the modification file of the result of the current merge. Because the merged file may + * be invisible at this moment, without this, deletion/update during merge could be lost. + */ + public ModificationFile mergingModification; + public long mergeStartTime; + public TsFileManagement(String storageGroupName, String storageGroupDir) { this.storageGroupName = storageGroupName; this.storageGroupDir = storageGroupDir; @@ -146,4 +178,202 @@ public abstract class TsFileManagement { closeHotCompactionMergeCallBack.call(); } } + + public void merge(boolean fullMerge, List<TsFileResource> seqMergeList, + List<TsFileResource> unSeqMergeList, long dataTTL) { + if (isUnseqMerging) { + if (logger.isInfoEnabled()) { + logger.info("{} Last merge is ongoing, currently consumed time: {}ms", storageGroupName, + (System.currentTimeMillis() - mergeStartTime)); + } + return; + } + logger.info("{} will close all files for starting a merge (fullmerge = {})", storageGroupName, + fullMerge); + + if (seqMergeList.isEmpty() || unSeqMergeList.isEmpty()) { + logger.info("{} no files to be merged", storageGroupName); + return; + } + + long budget = IoTDBDescriptor.getInstance().getConfig().getMergeMemoryBudget(); + long timeLowerBound = System.currentTimeMillis() - dataTTL; + MergeResource mergeResource = new MergeResource(seqMergeList, unSeqMergeList, timeLowerBound); + + IMergeFileSelector fileSelector = getMergeFileSelector(budget, mergeResource); + try { + List[] mergeFiles = fileSelector.select(); + if (mergeFiles.length == 0) { + logger.info("{} cannot select merge candidates under the budget {}", storageGroupName, + budget); + return; + } + // avoid pending tasks holds the metadata and streams + mergeResource.clear(); + String taskName = storageGroupName + "-" + System.currentTimeMillis(); + // do not cache metadata until true candidates are chosen, or too much metadata will be + // cached during selection + mergeResource.setCacheDeviceMeta(true); + + for (TsFileResource tsFileResource : mergeResource.getSeqFiles()) { + tsFileResource.setMerging(true); + } + for (TsFileResource tsFileResource : mergeResource.getUnseqFiles()) { + tsFileResource.setMerging(true); + } + + MergeTask mergeTask = new MergeTask(mergeResource, storageGroupDir, + this::mergeEndAction, taskName, fullMerge, fileSelector.getConcurrentMergeNum(), + storageGroupName); + mergingModification = new ModificationFile( + storageGroupDir + File.separator + MERGING_MODIFICATION_FILE_NAME); + MergeManager.getINSTANCE().submitMainTask(mergeTask); + if (logger.isInfoEnabled()) { + logger.info("{} submits a merge task {}, merging {} seqFiles, {} unseqFiles", + storageGroupName, taskName, mergeFiles[0].size(), mergeFiles[1].size()); + } + isUnseqMerging = true; + mergeStartTime = System.currentTimeMillis(); + + } catch (MergeException | IOException e) { + logger.error("{} cannot select file for merge", storageGroupName, e); + } + } + + private IMergeFileSelector getMergeFileSelector(long budget, MergeResource resource) { + MergeFileStrategy strategy = IoTDBDescriptor.getInstance().getConfig().getMergeFileStrategy(); + switch (strategy) { + case MAX_FILE_NUM: + return new MaxFileMergeFileSelector(resource, budget); + case MAX_SERIES_NUM: + return new MaxSeriesMergeFileSelector(resource, budget); + default: + throw new UnsupportedOperationException("Unknown MergeFileStrategy " + strategy); + } + } + + /** + * acquire the write locks of the resource , the merge lock and the hot compaction lock + */ + private void doubleWriteLock(TsFileResource seqFile) { + boolean fileLockGot; + boolean mergeLockGot; + boolean hotCompactionLockGot; + while (true) { + fileLockGot = seqFile.tryWriteLock(); + mergeLockGot = mergeLock.writeLock().tryLock(); + hotCompactionLockGot = tryWriteLock(); + + if (fileLockGot && mergeLockGot && hotCompactionLockGot) { + break; + } else { + // did not get all of them, release the gotten one and retry + if (hotCompactionLockGot) { + writeUnlock(); + } + if (mergeLockGot) { + mergeLock.writeLock().unlock(); + } + if (fileLockGot) { + seqFile.writeUnlock(); + } + } + } + } + + /** + * release the write locks of the resource , the merge lock and the hot compaction lock + */ + private void doubleWriteUnlock(TsFileResource seqFile) { + writeUnlock(); + mergeLock.writeLock().unlock(); + seqFile.writeUnlock(); + } + + private void removeUnseqFiles(List<TsFileResource> unseqFiles) { + mergeLock.writeLock().lock(); + writeLock(); + try { + removeAll(unseqFiles, false); + } finally { + writeUnlock(); + mergeLock.writeLock().unlock(); + } + + for (TsFileResource unseqFile : unseqFiles) { + unseqFile.writeLock(); + try { + unseqFile.remove(); + } finally { + unseqFile.writeUnlock(); + } + } + } + + @SuppressWarnings("squid:S1141") + private void updateMergeModification(TsFileResource seqFile) { + try { + // remove old modifications and write modifications generated during merge + seqFile.removeModFile(); + if (mergingModification != null) { + for (Modification modification : mergingModification.getModifications()) { + seqFile.getModFile().write(modification); + } + try { + seqFile.getModFile().close(); + } catch (IOException e) { + logger + .error("Cannot close the ModificationFile {}", seqFile.getModFile().getFilePath(), e); + } + } + } catch (IOException e) { + logger.error("{} cannot clean the ModificationFile of {} after merge", storageGroupName, + seqFile.getTsFile(), e); + } + } + + private void removeMergingModification() { + try { + if (mergingModification != null) { + mergingModification.remove(); + mergingModification = null; + } + } catch (IOException e) { + logger.error("{} cannot remove merging modification ", storageGroupName, e); + } + } + + public void mergeEndAction(List<TsFileResource> seqFiles, List<TsFileResource> unseqFiles, + File mergeLog) { + logger.info("{} a merge task is ending...", storageGroupName); + + if (unseqFiles.isEmpty()) { + // merge runtime exception arose, just end this merge + isUnseqMerging = false; + logger.info("{} a merge task abnormally ends", storageGroupName); + return; + } + + removeUnseqFiles(unseqFiles); + + for (int i = 0; i < seqFiles.size(); i++) { + TsFileResource seqFile = seqFiles.get(i); + // get both seqFile lock and merge lock + doubleWriteLock(seqFile); + + try { + updateMergeModification(seqFile); + if (i == seqFiles.size() - 1) { + //FIXME if there is an exception, the the modification file will be not closed. + removeMergingModification(); + isUnseqMerging = false; + mergeLog.delete(); + } + } finally { + doubleWriteUnlock(seqFile); + } + } + logger.info("{} a merge task ends", storageGroupName); + } + } diff --git a/server/src/main/java/org/apache/iotdb/db/engine/tsfilemanagement/level/LevelTsFileManagement.java b/server/src/main/java/org/apache/iotdb/db/engine/tsfilemanagement/level/LevelTsFileManagement.java index 2b852d2..8c5f482 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/tsfilemanagement/level/LevelTsFileManagement.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/tsfilemanagement/level/LevelTsFileManagement.java @@ -35,7 +35,6 @@ import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; -import java.util.Map.Entry; import java.util.Set; import java.util.TreeSet; import java.util.concurrent.ConcurrentSkipListMap; @@ -44,16 +43,23 @@ import java.util.concurrent.CopyOnWriteArrayList; import com.clearspring.analytics.stream.cardinality.HyperLogLog; import com.clearspring.analytics.stream.cardinality.ICardinality; import org.apache.iotdb.db.conf.IoTDBDescriptor; -import org.apache.iotdb.db.conf.adapter.ActiveTimeSeriesCounter; import org.apache.iotdb.db.engine.cache.ChunkMetadataCache; -import org.apache.iotdb.db.engine.cache.FileChunkPointSizeCache; +import org.apache.iotdb.db.engine.merge.manage.MergeManager; +import org.apache.iotdb.db.engine.merge.manage.MergeResource; +import org.apache.iotdb.db.engine.merge.selector.IMergeFileSelector; +import org.apache.iotdb.db.engine.merge.task.MergeTask; +import org.apache.iotdb.db.engine.modification.ModificationFile; import org.apache.iotdb.db.engine.storagegroup.TsFileResource; import org.apache.iotdb.db.engine.tsfilemanagement.TsFileManagement; import org.apache.iotdb.db.engine.tsfilemanagement.utils.HotCompactionLogAnalyzer; import org.apache.iotdb.db.engine.tsfilemanagement.utils.HotCompactionLogger; import org.apache.iotdb.db.engine.tsfilemanagement.utils.HotCompactionUtils; +import org.apache.iotdb.db.exception.MergeException; import org.apache.iotdb.db.query.control.FileReaderManager; +import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata; import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer; +import org.apache.iotdb.tsfile.read.TsFileSequenceReader; +import org.apache.iotdb.tsfile.read.common.Path; import org.apache.iotdb.tsfile.utils.Pair; import org.apache.iotdb.tsfile.write.writer.RestorableTsFileIOWriter; import org.slf4j.Logger; @@ -66,21 +72,26 @@ public class LevelTsFileManagement extends TsFileManagement { private static final Logger logger = LoggerFactory.getLogger(LevelTsFileManagement.class); private final int maxLevelNum = IoTDBDescriptor.getInstance().getConfig().getMaxLevelNum(); + private final int maxFileNumInEachLevel = IoTDBDescriptor.getInstance().getConfig() + .getMaxFileNumInEachLevel(); + private final int maxUnseqLevelNum = IoTDBDescriptor.getInstance().getConfig() + .getMaxUnseqLevelNum(); + private final int maxUnseqFileNumInEachLevel = IoTDBDescriptor.getInstance().getConfig() + .getMaxFileNumInEachLevel(); private final int maxChunkPointNum = IoTDBDescriptor.getInstance().getConfig() .getMergeChunkPointNumberThreshold(); + private final boolean isForceFullMerge = IoTDBDescriptor.getInstance().getConfig() + .isForceFullMerge(); // First map is partition list; Second list is level list; Third list is file list in level; private final Map<Long, List<TreeSet<TsFileResource>>> sequenceTsFileResources = new ConcurrentSkipListMap<>(); private final Map<Long, List<List<TsFileResource>>> unSequenceTsFileResources = new ConcurrentSkipListMap<>(); private final List<List<TsFileResource>> forkedSequenceTsFileResources = new ArrayList<>(); private final List<List<TsFileResource>> forkedUnSequenceTsFileResources = new ArrayList<>(); - // Deciding whether or not to merge; - //private boolean forkedSeqListMergeFlag = false; - //private boolean forkedUnSeqListMergeFlag = false; private double forkedSeqListPointNum = 0; - private double forkedSeqListDeviceSize = 0; + private double forkedSeqListMeasurementSize = 0; private double forkedUnSeqListPointNum = 0; - private double forkedUnSeqListDeviceSize = 0; + private double forkedUnSeqListMeasurementSize = 0; public LevelTsFileManagement(String storageGroupName, String storageGroupDir) { super(storageGroupName, storageGroupDir); @@ -94,6 +105,8 @@ public class LevelTsFileManagement extends TsFileManagement { } for (int i = 0; i < maxLevelNum; i++) { sequenceTsFileResources.get(timePartitionId).get(i).removeAll(mergeTsFiles); + } + for (int i = 0; i < maxUnseqLevelNum; i++) { unSequenceTsFileResources.get(timePartitionId).get(i).removeAll(mergeTsFiles); } } @@ -116,7 +129,8 @@ public class LevelTsFileManagement extends TsFileManagement { List<List<TsFileResource>> currMergeFiles, HotCompactionLogger hotCompactionLogger, boolean sequence) throws IOException { TsFileResource sourceFile = currMergeFiles.get(0).get(0); - File newTargetFile = createNewTsFileName(sourceFile.getTsFile(), maxLevelNum - 1); + File newTargetFile = createNewTsFileName(sourceFile.getTsFile(), + sequence ? (maxLevelNum - 1) : (maxUnseqLevelNum - 1)); TsFileResource targetResource = new TsFileResource(newTargetFile); List<TsFileResource> mergeFiles = new ArrayList<>(); for (int i = currMergeFiles.size() - 1; i >= 0; i--) { @@ -128,8 +142,14 @@ public class LevelTsFileManagement extends TsFileManagement { hotCompactionLogger.logSequence(sequence); hotCompactionLogger.logFile(TARGET_NAME, newTargetFile); writeLock(); - for (int i = 0; i < maxLevelNum - 1; i++) { - deleteLevelFiles(timePartitionId, currMergeFiles.get(i)); + if (sequence) { + for (int i = 0; i < maxLevelNum - 1; i++) { + deleteLevelFiles(timePartitionId, currMergeFiles.get(i)); + } + } else { + for (int i = 0; i < maxUnseqLevelNum - 1; i++) { + deleteLevelFiles(timePartitionId, currMergeFiles.get(i)); + } } writeUnlock(); hotCompactionLogger.logMergeFinish(); @@ -144,7 +164,7 @@ public class LevelTsFileManagement extends TsFileManagement { } } else { for (List<List<TsFileResource>> unSequenceTsFileList : unSequenceTsFileResources.values()) { - result.addAll(unSequenceTsFileList.get(maxLevelNum - 1)); + result.addAll(unSequenceTsFileList.get(maxUnseqLevelNum - 1)); } } return result; @@ -212,25 +232,25 @@ public class LevelTsFileManagement extends TsFileManagement { public void add(TsFileResource tsFileResource, boolean sequence) { long timePartitionId = tsFileResource.getTimePartition(); int level = getMergeLevel(tsFileResource.getTsFile()); - if (level <= maxLevelNum - 1) { - if (sequence) { + if (sequence) { + if (level <= maxLevelNum - 1) { sequenceTsFileResources .computeIfAbsent(timePartitionId, this::newSequenceTsFileResources).get(level) .add(tsFileResource); } else { - unSequenceTsFileResources - .computeIfAbsent(timePartitionId, this::newUnSequenceTsFileResources).get(level) + sequenceTsFileResources + .computeIfAbsent(timePartitionId, this::newSequenceTsFileResources).get(maxLevelNum - 1) .add(tsFileResource); } } else { - if (sequence) { - sequenceTsFileResources - .computeIfAbsent(timePartitionId, this::newSequenceTsFileResources).get(maxLevelNum - 1) + if (level <= maxUnseqLevelNum - 1) { + unSequenceTsFileResources + .computeIfAbsent(timePartitionId, this::newUnSequenceTsFileResources).get(level) .add(tsFileResource); } else { unSequenceTsFileResources .computeIfAbsent(timePartitionId, this::newUnSequenceTsFileResources) - .get(maxLevelNum - 1).add(tsFileResource); + .get(maxUnseqLevelNum - 1).add(tsFileResource); } } } @@ -306,7 +326,7 @@ public class LevelTsFileManagement extends TsFileManagement { } else { for (List<List<TsFileResource>> partitionUnSequenceTsFileResource : unSequenceTsFileResources .values()) { - for (int i = maxLevelNum - 1; i >= 0; i--) { + for (int i = maxUnseqLevelNum - 1; i >= 0; i--) { result += partitionUnSequenceTsFileResource.get(i).size(); } } @@ -413,41 +433,55 @@ public class LevelTsFileManagement extends TsFileManagement { public void forkCurrentFileList(long timePartition) { Pair<Double, Double> seqResult = forkTsFileList( forkedSequenceTsFileResources, - sequenceTsFileResources.computeIfAbsent(timePartition, this::newSequenceTsFileResources)); + sequenceTsFileResources.computeIfAbsent(timePartition, this::newSequenceTsFileResources), + maxLevelNum); forkedSeqListPointNum = seqResult.left; - forkedSeqListDeviceSize = seqResult.right; + forkedSeqListMeasurementSize = seqResult.right; Pair<Double, Double> unSeqResult = forkTsFileList( forkedUnSequenceTsFileResources, - unSequenceTsFileResources.computeIfAbsent(timePartition, this::newUnSequenceTsFileResources)); + unSequenceTsFileResources + .computeIfAbsent(timePartition, this::newUnSequenceTsFileResources), maxUnseqLevelNum); forkedUnSeqListPointNum = unSeqResult.left; - forkedUnSeqListDeviceSize = unSeqResult.right; + forkedUnSeqListMeasurementSize = unSeqResult.right; } private Pair<Double, Double> forkTsFileList( List<List<TsFileResource>> forkedTsFileResources, - List rawTsFileResources) { + List rawTsFileResources, int currMaxLevel) { forkedTsFileResources.clear(); // just fork part of the TsFile list, controlled by max_merge_chunk_point long pointNum = 0; // all flush to target file -// Set<String> deviceSet = new HashSet<>(); - ICardinality deviceSet = new HyperLogLog(13); - for (int i = 0; i < maxLevelNum - 1; i++) { + ICardinality measurementSet = new HyperLogLog(13); + for (int i = 0; i < currMaxLevel - 1; i++) { List<TsFileResource> forkedLevelTsFileResources = new ArrayList<>(); Collection<TsFileResource> levelRawTsFileResources = (Collection<TsFileResource>) rawTsFileResources .get(i); synchronized (levelRawTsFileResources) { for (TsFileResource tsFileResource : levelRawTsFileResources) { if (tsFileResource.isClosed()) { - Map<String, Long> chunkPointMap = FileChunkPointSizeCache.getInstance() - .get(tsFileResource.getTsFile()); - for (Entry<String, Long> deviceChunkPoint : chunkPointMap.entrySet()) { - deviceSet.offer(deviceChunkPoint.getKey()); - pointNum += deviceChunkPoint.getValue(); + String path = tsFileResource.getTsFile().getAbsolutePath(); + try { + if (tsFileResource.getTsFile().exists()) { + TsFileSequenceReader reader = new TsFileSequenceReader(path); + List<Path> pathList = reader.getAllPaths(); + for (Path sensorPath : pathList) { + measurementSet.offer(sensorPath.getFullPath()); + List<ChunkMetadata> chunkMetadataList = reader.getChunkMetadataList(sensorPath); + for (ChunkMetadata chunkMetadata : chunkMetadataList) { + pointNum += chunkMetadata.getNumOfPoints(); + } + } + } else { + logger.info("{} tsfile does not exist", path); + } + } catch (IOException e) { + logger.error( + "{} tsfile reader creates error", path, e); } } - if (deviceSet.cardinality() > 0 - && pointNum / deviceSet.cardinality() >= maxChunkPointNum) { + if (measurementSet.cardinality() > 0 + && pointNum / measurementSet.cardinality() >= maxChunkPointNum) { forkedLevelTsFileResources.add(tsFileResource); break; } @@ -455,112 +489,94 @@ public class LevelTsFileManagement extends TsFileManagement { } } - if (deviceSet.cardinality() > 0 - && pointNum / deviceSet.cardinality() >= maxChunkPointNum) { + if (measurementSet.cardinality() > 0 + && pointNum / measurementSet.cardinality() >= maxChunkPointNum) { forkedTsFileResources.add(forkedLevelTsFileResources); break; } forkedTsFileResources.add(forkedLevelTsFileResources); - //System.out.println(forkedLevelTsFileResources.size()); - //System.out.println(forkedTsFileResources.get(i).size()); } // fill in empty file - while (forkedTsFileResources.size() < maxLevelNum) { + while (forkedTsFileResources.size() < currMaxLevel) { List<TsFileResource> emptyForkedLevelTsFileResources = new ArrayList<>(); forkedTsFileResources.add(emptyForkedLevelTsFileResources); } - //return forkedTsFileResources.size() > 1; - return new Pair<>((double)pointNum, (double)deviceSet.cardinality()); + return new Pair<>((double) pointNum, (double) measurementSet.cardinality()); } -// private Pair<Long, Set<String>> forkTsFileList( -// List<List<TsFileResource>> forkedTsFileResources, -// List rawTsFileResources) { -// forkedTsFileResources.clear(); -// // just fork part of the TsFile list, controlled by max_merge_chunk_point -// long pointNum = 0; -// // all flush to target file -// Set<String> deviceSet = new HashSet<>(); -// for (int i = 0; i < maxLevelNum - 1; i++) { -// List<TsFileResource> forkedLevelTsFileResources = new ArrayList<>(); -// Collection<TsFileResource> levelRawTsFileResources = (Collection<TsFileResource>) rawTsFileResources -// .get(i); -// synchronized (levelRawTsFileResources) { -// for (TsFileResource tsFileResource : levelRawTsFileResources) { -// if (tsFileResource.isClosed()) { -// forkedLevelTsFileResources.add(tsFileResource); -// } -// } -// } -// forkedTsFileResources.add(forkedLevelTsFileResources); -// } -// return new Pair<>(pointNum, deviceSet); -// } - @Override protected void merge(long timePartition) { - merge(forkedSequenceTsFileResources, true, timePartition); - merge(forkedUnSequenceTsFileResources, false, timePartition); + merge(forkedSequenceTsFileResources, true, timePartition, maxLevelNum, maxFileNumInEachLevel); + if (maxUnseqLevelNum <= 1) { + merge(isForceFullMerge, getTsFileList(true), forkedUnSequenceTsFileResources.get(0), + Long.MAX_VALUE); + } else { + merge(forkedUnSequenceTsFileResources, false, timePartition, maxUnseqLevelNum, + maxUnseqFileNumInEachLevel); + } } @SuppressWarnings("squid:S3776") private void merge(List<List<TsFileResource>> mergeResources, boolean sequence, - long timePartition) { + long timePartition, int currMaxLevel, int currMaxFileNumInEachLevel) { long startTimeMillis = System.currentTimeMillis(); try { logger.info("{} start to filter hot compaction condition", storageGroupName); double pointNum = sequence ? forkedSeqListPointNum : forkedUnSeqListPointNum; - double deviceSize = - sequence ? forkedSeqListDeviceSize : forkedSeqListDeviceSize; - //boolean mergeFlag = sequence ? forkedSeqListMergeFlag : forkedUnSeqListMergeFlag; - + double measurementSize = + sequence ? forkedSeqListMeasurementSize : forkedUnSeqListMeasurementSize; logger - .info("{} current sg subLevel point num: {}, approximate device num: {}", storageGroupName, pointNum, - deviceSize); + .info("{} current sg subLevel point num: {}, approximate measurement num: {}", + storageGroupName, pointNum, + measurementSize); HotCompactionLogger hotCompactionLogger = new HotCompactionLogger(storageGroupDir, storageGroupName); - if (deviceSize > 0 && pointNum / deviceSize >= maxChunkPointNum) { + if (measurementSize > 0 && pointNum / measurementSize >= maxChunkPointNum) { // merge all tsfile to last level logger.info("{} merge {} level tsfiles to next level", storageGroupName, mergeResources.size()); flushAllFilesToLastLevel(timePartition, mergeResources, hotCompactionLogger, sequence); } else { - for (int i = 0; i < maxLevelNum - 1; i++) { - if (maxFileNumInEachLevel <= mergeResources.get(i).size()) { - for (TsFileResource mergeResource : mergeResources.get(i)) { - hotCompactionLogger.logFile(SOURCE_NAME, mergeResource.getTsFile()); - } - File newLevelFile = createNewTsFileName(mergeResources.get(i).get(0).getTsFile(), - i + 1); - hotCompactionLogger.logSequence(sequence); - hotCompactionLogger.logFile(TARGET_NAME, newLevelFile); - logger.info("{} [Hot Compaction] merge level-{}'s {} tsfiles to next level vm", - storageGroupName, i, mergeResources.get(i).size()); - - TsFileResource newResource = new TsFileResource(newLevelFile); - HotCompactionUtils - .merge(newResource, mergeResources.get(i), storageGroupName, hotCompactionLogger, - new HashSet<>(), sequence); - writeLock(); - try { - deleteLevelFiles(timePartition, mergeResources.get(i)); - hotCompactionLogger.logMergeFinish(); - if (sequence) { - sequenceTsFileResources.get(timePartition).get(i + 1).add(newResource); + for (int i = 0; i < currMaxLevel - 1; i++) { + if (currMaxFileNumInEachLevel <= mergeResources.get(i).size()) { + if (!sequence && i == currMaxLevel - 2) { + merge(isForceFullMerge, getTsFileList(true), mergeResources.get(i), Long.MAX_VALUE); } else { - unSequenceTsFileResources.get(timePartition).get(i + 1).add(newResource); - } - if (mergeResources.size() > i + 1) { - mergeResources.get(i + 1).add(newResource); + for (TsFileResource mergeResource : mergeResources.get(i)) { + hotCompactionLogger.logFile(SOURCE_NAME, mergeResource.getTsFile()); + } + File newLevelFile = createNewTsFileName(mergeResources.get(i).get(0).getTsFile(), + i + 1); + hotCompactionLogger.logSequence(sequence); + hotCompactionLogger.logFile(TARGET_NAME, newLevelFile); + logger.info("{} [Hot Compaction] merge level-{}'s {} tsfiles to next level vm", + storageGroupName, i, mergeResources.get(i).size()); + + TsFileResource newResource = new TsFileResource(newLevelFile); + HotCompactionUtils + .merge(newResource, mergeResources.get(i), storageGroupName, hotCompactionLogger, + new HashSet<>(), sequence); + writeLock(); + try { + deleteLevelFiles(timePartition, mergeResources.get(i)); + hotCompactionLogger.logMergeFinish(); + if (sequence) { + sequenceTsFileResources.get(timePartition).get(i + 1).add(newResource); + } else { + unSequenceTsFileResources.get(timePartition).get(i + 1).add(newResource); + } + if (mergeResources.size() > i + 1) { + mergeResources.get(i + 1).add(newResource); + } + } finally { + writeUnlock(); + } } - } finally { - writeUnlock(); } } } - } hotCompactionLogger.close(); File logFile = FSFactoryProducer.getFSFactory() .getFile(storageGroupDir, storageGroupName + HOT_COMPACTION_LOG_NAME); @@ -610,7 +626,7 @@ public class LevelTsFileManagement extends TsFileManagement { private List<List<TsFileResource>> newUnSequenceTsFileResources(Long k) { List<List<TsFileResource>> newUnSequenceTsFileResources = new CopyOnWriteArrayList<>(); - for (int i = 0; i < maxLevelNum; i++) { + for (int i = 0; i < maxUnseqLevelNum; i++) { newUnSequenceTsFileResources.add(new CopyOnWriteArrayList<>()); } return newUnSequenceTsFileResources; diff --git a/server/src/test/java/org/apache/iotdb/db/engine/merge/MergeManagerTest.java b/server/src/test/java/org/apache/iotdb/db/engine/merge/MergeManagerTest.java index c77bad7..d03abc0 100644 --- a/server/src/test/java/org/apache/iotdb/db/engine/merge/MergeManagerTest.java +++ b/server/src/test/java/org/apache/iotdb/db/engine/merge/MergeManagerTest.java @@ -36,9 +36,9 @@ public class MergeManagerTest extends MergeTest { RateLimiter compactionRateLimiter = MergeManager.getINSTANCE().getMergeRateLimiter(); long startTime = System.currentTimeMillis(); MergeManager.mergeRateLimiterAcquire(compactionRateLimiter, 160 * 1024 * 1024L); - assertTrue((System.currentTimeMillis() - startTime) < 1000); + assertTrue((System.currentTimeMillis() - startTime) <= 1000); MergeManager.mergeRateLimiterAcquire(compactionRateLimiter, 16 * 1024 * 1024L); - assertTrue((System.currentTimeMillis() - startTime) > 1000); + assertTrue((System.currentTimeMillis() - startTime) >= 9000); } diff --git a/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessorTest.java b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessorTest.java index cfa7c64..d33c92c 100644 --- a/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessorTest.java +++ b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessorTest.java @@ -26,7 +26,6 @@ import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.concurrent.atomic.AtomicLong; -import org.apache.iotdb.db.conf.IoTDBConfig; import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.conf.adapter.ActiveTimeSeriesCounter; import org.apache.iotdb.db.constant.TestConstant; @@ -41,6 +40,7 @@ import org.apache.iotdb.db.exception.WriteProcessException; import org.apache.iotdb.db.exception.metadata.IllegalPathException; import org.apache.iotdb.db.exception.query.QueryProcessException; import org.apache.iotdb.db.metadata.PartialPath; +import org.apache.iotdb.db.metadata.mnode.MNode; import org.apache.iotdb.db.metadata.mnode.MeasurementMNode; import org.apache.iotdb.db.qp.physical.crud.InsertRowPlan; import org.apache.iotdb.db.qp.physical.crud.InsertTabletPlan; @@ -66,7 +66,6 @@ public class StorageGroupProcessorTest { private String measurementId = "s0"; private StorageGroupProcessor processor; private QueryContext context = EnvironmentUtils.TEST_QUERY_CONTEXT; - private AtomicLong mergeLock; @Before public void setUp() throws Exception { @@ -158,8 +157,9 @@ public class StorageGroupProcessorTest { e.printStackTrace(); } processor.syncCloseAllWorkingTsFileProcessors(); - QueryDataSource queryDataSource = processor.query(new PartialPath(deviceId), measurementId, context, - null, null); + QueryDataSource queryDataSource = processor + .query(new PartialPath(deviceId), measurementId, context, + null, null); Assert.assertEquals(10, queryDataSource.getSeqResources().size()); for (TsFileResource resource : queryDataSource.getSeqResources()) { Assert.assertTrue(resource.isClosed()); @@ -177,15 +177,14 @@ public class StorageGroupProcessorTest { dataTypes.add(TSDataType.INT32.ordinal()); dataTypes.add(TSDataType.INT64.ordinal()); - MeasurementMNode[] measurementMNodes = new MeasurementMNode[2]; measurementMNodes[0] = new MeasurementMNode(null, "s0", new MeasurementSchema("s0", TSDataType.INT32, TSEncoding.PLAIN), null); measurementMNodes[1] = new MeasurementMNode(null, "s1", new MeasurementSchema("s1", TSDataType.INT64, TSEncoding.PLAIN), null); - - InsertTabletPlan insertTabletPlan1 = new InsertTabletPlan(new PartialPath("root.vehicle.d0"), measurements, + InsertTabletPlan insertTabletPlan1 = new InsertTabletPlan(new PartialPath("root.vehicle.d0"), + measurements, dataTypes); insertTabletPlan1.setMeasurementMNodes(measurementMNodes); @@ -206,7 +205,8 @@ public class StorageGroupProcessorTest { processor.insertTablet(insertTabletPlan1); processor.asyncCloseAllWorkingTsFileProcessors(); - InsertTabletPlan insertTabletPlan2 = new InsertTabletPlan(new PartialPath("root.vehicle.d0"), measurements, + InsertTabletPlan insertTabletPlan2 = new InsertTabletPlan(new PartialPath("root.vehicle.d0"), + measurements, dataTypes); insertTabletPlan2.setMeasurementMNodes(measurementMNodes); @@ -223,8 +223,9 @@ public class StorageGroupProcessorTest { processor.asyncCloseAllWorkingTsFileProcessors(); processor.syncCloseAllWorkingTsFileProcessors(); - QueryDataSource queryDataSource = processor.query(new PartialPath(deviceId), measurementId, context, - null, null); + QueryDataSource queryDataSource = processor + .query(new PartialPath(deviceId), measurementId, context, + null, null); Assert.assertEquals(2, queryDataSource.getSeqResources().size()); Assert.assertEquals(1, queryDataSource.getUnseqResources().size()); @@ -255,8 +256,9 @@ public class StorageGroupProcessorTest { processor.syncCloseAllWorkingTsFileProcessors(); - QueryDataSource queryDataSource = processor.query(new PartialPath(deviceId), measurementId, context, - null, null); + QueryDataSource queryDataSource = processor + .query(new PartialPath(deviceId), measurementId, context, + null, null); Assert.assertEquals(10, queryDataSource.getSeqResources().size()); Assert.assertEquals(10, queryDataSource.getUnseqResources().size()); for (TsFileResource resource : queryDataSource.getSeqResources()) { @@ -268,291 +270,8 @@ public class StorageGroupProcessorTest { } @Test - public void testEnableDiscardOutOfOrderDataForInsertRowPlan() - throws WriteProcessException, QueryProcessException, IllegalPathException, IOException { - IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig(); - boolean defaultValue = config.isEnableDiscardOutOfOrderData(); - config.setEnableDiscardOutOfOrderData(true); - - for (int j = 21; j <= 30; j++) { - TSRecord record = new TSRecord(j, deviceId); - record.addTuple(DataPoint.getDataPoint(TSDataType.INT32, measurementId, String.valueOf(j))); - insertToStorageGroupProcessor(record); - processor.asyncCloseAllWorkingTsFileProcessors(); - } - processor.syncCloseAllWorkingTsFileProcessors(); - - for (int j = 10; j >= 1; j--) { - TSRecord record = new TSRecord(j, deviceId); - record.addTuple(DataPoint.getDataPoint(TSDataType.INT32, measurementId, String.valueOf(j))); - insertToStorageGroupProcessor(record); - processor.asyncCloseAllWorkingTsFileProcessors(); - } - - processor.syncCloseAllWorkingTsFileProcessors(); - - for (TsFileProcessor tsfileProcessor : processor.getWorkUnsequenceTsFileProcessor()) { - tsfileProcessor.syncFlush(); - } - - QueryDataSource queryDataSource = processor.query(new PartialPath(deviceId), measurementId, context, - null, null); - Assert.assertEquals(10, queryDataSource.getSeqResources().size()); - Assert.assertEquals(0, queryDataSource.getUnseqResources().size()); - for (TsFileResource resource : queryDataSource.getSeqResources()) { - Assert.assertTrue(resource.isClosed()); - } - for (TsFileResource resource : queryDataSource.getUnseqResources()) { - Assert.assertTrue(resource.isClosed()); - } - - config.setEnableDiscardOutOfOrderData(defaultValue); - } - - @Test - public void testEnableDiscardOutOfOrderDataForInsertTablet1() - throws QueryProcessException, IllegalPathException, IOException { - IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig(); - boolean defaultEnableDiscard = config.isEnableDiscardOutOfOrderData(); - long defaultTimePartition = config.getPartitionInterval(); - boolean defaultEnablePartition = config.isEnablePartition(); - config.setEnableDiscardOutOfOrderData(true); - config.setEnablePartition(true); - config.setPartitionInterval(100); - - String[] measurements = new String[2]; - measurements[0] = "s0"; - measurements[1] = "s1"; - List<Integer> dataTypes = new ArrayList<>(); - dataTypes.add(TSDataType.INT32.ordinal()); - dataTypes.add(TSDataType.INT64.ordinal()); - - MeasurementMNode[] measurementMNodes = new MeasurementMNode[2]; - measurementMNodes[0] = new MeasurementMNode(null, "s0", - new MeasurementSchema("s0", TSDataType.INT32, TSEncoding.PLAIN), null); - measurementMNodes[1] = new MeasurementMNode(null, "s1", - new MeasurementSchema("s1", TSDataType.INT64, TSEncoding.PLAIN), null); - - InsertTabletPlan insertTabletPlan1 = new InsertTabletPlan(new PartialPath("root.vehicle.d0"), measurements, - dataTypes); - - long[] times = new long[100]; - Object[] columns = new Object[2]; - columns[0] = new int[100]; - columns[1] = new long[100]; - - for (int r = 0; r < 100; r++) { - times[r] = r; - ((int[]) columns[0])[r] = 1; - ((long[]) columns[1])[r] = 1; - } - insertTabletPlan1.setTimes(times); - insertTabletPlan1.setColumns(columns); - insertTabletPlan1.setRowCount(times.length); - insertTabletPlan1.setMeasurementMNodes(measurementMNodes); - - processor.insertTablet(insertTabletPlan1); - processor.asyncCloseAllWorkingTsFileProcessors(); - - InsertTabletPlan insertTabletPlan2 = new InsertTabletPlan(new PartialPath("root.vehicle.d0"), measurements, - dataTypes); - - for (int r = 149; r >= 50; r--) { - times[r - 50] = r; - ((int[]) columns[0])[r - 50] = 1; - ((long[]) columns[1])[r - 50] = 1; - } - insertTabletPlan2.setTimes(times); - insertTabletPlan2.setColumns(columns); - insertTabletPlan2.setRowCount(times.length); - insertTabletPlan2.setMeasurementMNodes(measurementMNodes); - - processor.insertTablet(insertTabletPlan2); - processor.asyncCloseAllWorkingTsFileProcessors(); - processor.syncCloseAllWorkingTsFileProcessors(); - - for (TsFileProcessor tsfileProcessor : processor.getWorkUnsequenceTsFileProcessor()) { - tsfileProcessor.syncFlush(); - } - - QueryDataSource queryDataSource = processor.query(new PartialPath(deviceId), measurementId, context, - null, null); - - Assert.assertEquals(2, queryDataSource.getSeqResources().size()); - Assert.assertEquals(0, queryDataSource.getUnseqResources().size()); - for (TsFileResource resource : queryDataSource.getSeqResources()) { - Assert.assertTrue(resource.isClosed()); - } - - config.setEnableDiscardOutOfOrderData(defaultEnableDiscard); - config.setPartitionInterval(defaultTimePartition); - config.setEnablePartition(defaultEnablePartition); - } - - @Test - public void testEnableDiscardOutOfOrderDataForInsertTablet2() - throws QueryProcessException, IllegalPathException, IOException { - IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig(); - boolean defaultEnableDiscard = config.isEnableDiscardOutOfOrderData(); - long defaultTimePartition = config.getPartitionInterval(); - boolean defaultEnablePartition = config.isEnablePartition(); - config.setEnableDiscardOutOfOrderData(true); - config.setEnablePartition(true); - config.setPartitionInterval(1200); - - String[] measurements = new String[2]; - measurements[0] = "s0"; - measurements[1] = "s1"; - List<Integer> dataTypes = new ArrayList<>(); - dataTypes.add(TSDataType.INT32.ordinal()); - dataTypes.add(TSDataType.INT64.ordinal()); - - MeasurementMNode[] measurementMNodes = new MeasurementMNode[2]; - measurementMNodes[0] = new MeasurementMNode(null, "s0", - new MeasurementSchema("s0", TSDataType.INT32, TSEncoding.PLAIN), null); - measurementMNodes[1] = new MeasurementMNode(null, "s1", - new MeasurementSchema("s1", TSDataType.INT64, TSEncoding.PLAIN), null); - - InsertTabletPlan insertTabletPlan1 = new InsertTabletPlan(new PartialPath("root.vehicle.d0"), measurements, - dataTypes); - - long[] times = new long[1200]; - Object[] columns = new Object[2]; - columns[0] = new int[1200]; - columns[1] = new long[1200]; - - for (int r = 0; r < 1200; r++) { - times[r] = r; - ((int[]) columns[0])[r] = 1; - ((long[]) columns[1])[r] = 1; - } - insertTabletPlan1.setTimes(times); - insertTabletPlan1.setColumns(columns); - insertTabletPlan1.setRowCount(times.length); - insertTabletPlan1.setMeasurementMNodes(measurementMNodes); - - processor.insertTablet(insertTabletPlan1); - processor.asyncCloseAllWorkingTsFileProcessors(); - - InsertTabletPlan insertTabletPlan2 = new InsertTabletPlan(new PartialPath("root.vehicle.d0"), measurements, - dataTypes); - - for (int r = 1249; r >= 50; r--) { - times[r - 50] = r; - ((int[]) columns[0])[r - 50] = 1; - ((long[]) columns[1])[r - 50] = 1; - } - insertTabletPlan2.setTimes(times); - insertTabletPlan2.setColumns(columns); - insertTabletPlan2.setRowCount(times.length); - insertTabletPlan2.setMeasurementMNodes(measurementMNodes); - - processor.insertTablet(insertTabletPlan2); - processor.asyncCloseAllWorkingTsFileProcessors(); - processor.syncCloseAllWorkingTsFileProcessors(); - - for (TsFileProcessor tsfileProcessor : processor.getWorkUnsequenceTsFileProcessor()) { - tsfileProcessor.syncFlush(); - } - - QueryDataSource queryDataSource = processor.query(new PartialPath(deviceId), measurementId, context, - null, null); - - Assert.assertEquals(2, queryDataSource.getSeqResources().size()); - Assert.assertEquals(0, queryDataSource.getUnseqResources().size()); - for (TsFileResource resource : queryDataSource.getSeqResources()) { - Assert.assertTrue(resource.isClosed()); - } - - config.setEnableDiscardOutOfOrderData(defaultEnableDiscard); - config.setPartitionInterval(defaultTimePartition); - config.setEnablePartition(defaultEnablePartition); - } - - @Test - public void testEnableDiscardOutOfOrderDataForInsertTablet3() - throws QueryProcessException, IllegalPathException, IOException { - IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig(); - boolean defaultEnableDiscard = config.isEnableDiscardOutOfOrderData(); - long defaultTimePartition = config.getPartitionInterval(); - boolean defaultEnablePartition = config.isEnablePartition(); - config.setEnableDiscardOutOfOrderData(true); - config.setEnablePartition(true); - config.setPartitionInterval(1000); - - String[] measurements = new String[2]; - measurements[0] = "s0"; - measurements[1] = "s1"; - List<Integer> dataTypes = new ArrayList<>(); - dataTypes.add(TSDataType.INT32.ordinal()); - dataTypes.add(TSDataType.INT64.ordinal()); - - MeasurementMNode[] measurementMNodes = new MeasurementMNode[2]; - measurementMNodes[0] = new MeasurementMNode(null, "s0", - new MeasurementSchema("s0", TSDataType.INT32, TSEncoding.PLAIN), null); - measurementMNodes[1] = new MeasurementMNode(null, "s1", - new MeasurementSchema("s1", TSDataType.INT64, TSEncoding.PLAIN), null); - - InsertTabletPlan insertTabletPlan1 = new InsertTabletPlan(new PartialPath("root.vehicle.d0"), measurements, - dataTypes); - - long[] times = new long[1200]; - Object[] columns = new Object[2]; - columns[0] = new int[1200]; - columns[1] = new long[1200]; - - for (int r = 0; r < 1200; r++) { - times[r] = r; - ((int[]) columns[0])[r] = 1; - ((long[]) columns[1])[r] = 1; - } - insertTabletPlan1.setTimes(times); - insertTabletPlan1.setColumns(columns); - insertTabletPlan1.setRowCount(times.length); - insertTabletPlan1.setMeasurementMNodes(measurementMNodes); - - processor.insertTablet(insertTabletPlan1); - processor.asyncCloseAllWorkingTsFileProcessors(); - - InsertTabletPlan insertTabletPlan2 = new InsertTabletPlan(new PartialPath("root.vehicle.d0"), measurements, - dataTypes); - - for (int r = 1249; r >= 50; r--) { - times[r - 50] = r; - ((int[]) columns[0])[r - 50] = 1; - ((long[]) columns[1])[r - 50] = 1; - } - insertTabletPlan2.setTimes(times); - insertTabletPlan2.setColumns(columns); - insertTabletPlan2.setRowCount(times.length); - insertTabletPlan2.setMeasurementMNodes(measurementMNodes); - - processor.insertTablet(insertTabletPlan2); - processor.asyncCloseAllWorkingTsFileProcessors(); - processor.syncCloseAllWorkingTsFileProcessors(); - - for (TsFileProcessor tsfileProcessor : processor.getWorkUnsequenceTsFileProcessor()) { - tsfileProcessor.syncFlush(); - } - - QueryDataSource queryDataSource = processor.query(new PartialPath(deviceId), measurementId, context, - null, null); - - Assert.assertEquals(2, queryDataSource.getSeqResources().size()); - Assert.assertEquals(0, queryDataSource.getUnseqResources().size()); - for (TsFileResource resource : queryDataSource.getSeqResources()) { - Assert.assertTrue(resource.isClosed()); - } - - config.setEnableDiscardOutOfOrderData(defaultEnableDiscard); - config.setPartitionInterval(defaultTimePartition); - config.setEnablePartition(defaultEnablePartition); - } - - @Test - public void testMerge() throws WriteProcessException, QueryProcessException, IllegalPathException { - - mergeLock = new AtomicLong(0); + public void testMerge() + throws WriteProcessException, QueryProcessException, IllegalPathException { for (int j = 21; j <= 30; j++) { TSRecord record = new TSRecord(j, deviceId); record.addTuple(DataPoint.getDataPoint(TSDataType.INT32, measurementId, String.valueOf(j))); @@ -570,12 +289,13 @@ public class StorageGroupProcessorTest { processor.syncCloseAllWorkingTsFileProcessors(); processor.merge(true); - while (mergeLock.get() == 0) { + while (processor.tsFileManagement.isUnseqMerging) { // wait } - QueryDataSource queryDataSource = processor.query(new PartialPath(deviceId), measurementId, context, - null, null); + QueryDataSource queryDataSource = processor + .query(new PartialPath(deviceId), measurementId, context, + null, null); Assert.assertEquals(10, queryDataSource.getSeqResources().size()); Assert.assertEquals(0, queryDataSource.getUnseqResources().size()); for (TsFileResource resource : queryDataSource.getSeqResources()) { @@ -592,12 +312,5 @@ public class StorageGroupProcessorTest { super(systemInfoDir, storageGroupName, new TsFileFlushPolicy.DirectFlushPolicy()); } - @Override - protected void mergeEndAction(List<TsFileResource> seqFiles, List<TsFileResource> unseqFiles, - File mergeLog) { - super.mergeEndAction(seqFiles, unseqFiles, mergeLog); - mergeLock.incrementAndGet(); - assertFalse(mergeLog.exists()); - } } } \ No newline at end of file diff --git a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBMergeTest.java b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBMergeTest.java index d058ba5..f65130d 100644 --- a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBMergeTest.java +++ b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBMergeTest.java @@ -294,7 +294,7 @@ public class IoTDBMergeTest { } // it is uncertain whether the sub tasks are created at this time point, and we are only // sure that the main task is created - assertEquals(0, cnt); + assertEquals(1, cnt); } } }
