This is an automated email from the ASF dual-hosted git repository. rong pushed a commit to branch iotdb-5869 in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 299a380d9cdc39e8253e54417adb2c6c70efb953 Author: yschengzi <[email protected]> AuthorDate: Sun May 14 14:41:57 2023 +0800 [IOTDB-5869] Load strategy: load all files to unsequence dir (#9837) --- .../org/apache/iotdb/it/utils/TsFileGenerator.java | 42 ++++ .../iotdb/db/engine/storagegroup/DataRegion.java | 268 +++------------------ 2 files changed, 75 insertions(+), 235 deletions(-) diff --git a/integration-test/src/main/java/org/apache/iotdb/it/utils/TsFileGenerator.java b/integration-test/src/main/java/org/apache/iotdb/it/utils/TsFileGenerator.java index c790e0b92fc..3d399862c23 100644 --- a/integration-test/src/main/java/org/apache/iotdb/it/utils/TsFileGenerator.java +++ b/integration-test/src/main/java/org/apache/iotdb/it/utils/TsFileGenerator.java @@ -129,6 +129,48 @@ public class TsFileGenerator implements AutoCloseable { logger.info(String.format("Write %d points into device %s", number, device)); } + public void generateData( + String device, int number, long timeGap, boolean isAligned, long startTimestamp) + throws IOException, WriteProcessException { + List<MeasurementSchema> schemas = device2MeasurementSchema.get(device); + TreeSet<Long> timeSet = device2TimeSet.get(device); + Tablet tablet = new Tablet(device, schemas); + long[] timestamps = tablet.timestamps; + Object[] values = tablet.values; + long sensorNum = schemas.size(); + long startTime = startTimestamp; + + for (long r = 0; r < number; r++) { + int row = tablet.rowSize++; + startTime += timeGap; + timestamps[row] = startTime; + timeSet.add(startTime); + for (int i = 0; i < sensorNum; i++) { + generateDataPoint(values[i], row, schemas.get(i)); + } + // write + if (tablet.rowSize == tablet.getMaxRowNumber()) { + if (!isAligned) { + writer.write(tablet); + } else { + writer.writeAligned(tablet); + } + tablet.reset(); + } + } + // write + if (tablet.rowSize != 0) { + if (!isAligned) { + writer.write(tablet); + } else { + writer.writeAligned(tablet); + } + tablet.reset(); + } + + logger.info(String.format("Write %d points into device %s", number, device)); + } + private void generateDataPoint(Object obj, int row, MeasurementSchema schema) { switch (schema.getType()) { case INT32: diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/DataRegion.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/DataRegion.java index e609ddc4c11..a5a3d7e20be 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/DataRegion.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/DataRegion.java @@ -2350,16 +2350,9 @@ public class DataRegion implements IDataRegionForQuery { } /** - * Load a new tsfile to database processor. Tne file may have overlap with other files. + * Load a new tsfile to unsequence dir. * - * <p>that there has no file which is overlapping with the new file. - * - * <p>Firstly, determine the loading type of the file, whether it needs to be loaded in sequence - * list or unsequence list. - * - * <p>Secondly, execute the loading process by the type. - * - * <p>Finally, update the latestTimeForEachDevice and partitionLatestFlushedTimeForEachDevice. + * <p>Then, update the latestTimeForEachDevice and partitionLatestFlushedTimeForEachDevice. * * @param newTsFileResource tsfile resource @UsedBy load external tsfile module * @param deleteOriginFile whether to delete origin tsfile @@ -2370,43 +2363,31 @@ public class DataRegion implements IDataRegionForQuery { long newFilePartitionId = newTsFileResource.getTimePartitionWithCheck(); writeLock("loadNewTsFile"); try { - List<TsFileResource> sequenceList = - tsFileManager.getOrCreateSequenceListByTimePartition(newFilePartitionId); - - int insertPos = findInsertionPosition(newTsFileResource, sequenceList); - LoadTsFileType tsFileType = getLoadingTsFileType(insertPos, sequenceList); - String renameInfo = - (tsFileType == LoadTsFileType.LOAD_SEQUENCE) - ? IoTDBConstant.SEQUENCE_FLODER_NAME - : IoTDBConstant.UNSEQUENCE_FLODER_NAME; - newTsFileResource.setSeq(tsFileType == LoadTsFileType.LOAD_SEQUENCE); + newTsFileResource.setSeq(false); String newFileName = - getLoadingTsFileName(tsFileType, insertPos, newTsFileResource, sequenceList); + getNewTsFileName( + System.currentTimeMillis(), + getAndSetNewVersion(newFilePartitionId, newTsFileResource), + 0, + 0); if (!newFileName.equals(tsfileToBeInserted.getName())) { logger.info( - "TsFile {} must be renamed to {} for loading into the " + renameInfo + " list.", + "TsFile {} must be renamed to {} for loading into the unsequence list.", tsfileToBeInserted.getName(), newFileName); newTsFileResource.setFile( fsFactory.getFile(tsfileToBeInserted.getParentFile(), newFileName)); } - loadTsFileByType( - tsFileType, - tsfileToBeInserted, - newTsFileResource, - newFilePartitionId, - insertPos, - deleteOriginFile); - TsFileMetricManager.getInstance() - .addFile( - newTsFileResource.getTsFile().length(), tsFileType == LoadTsFileType.LOAD_SEQUENCE); + loadTsFileToUnSequence( + tsfileToBeInserted, newTsFileResource, newFilePartitionId, deleteOriginFile); + TsFileMetricManager.getInstance().addFile(newTsFileResource.getTsFile().length(), false); resetLastCacheWhenLoadingTsFile(); // update last cache updateLastFlushTime(newTsFileResource); // update last flush time long partitionNum = newTsFileResource.getTimePartition(); updatePartitionFileVersion(partitionNum, newTsFileResource.getVersion()); - logger.info("TsFile {} is successfully loaded in {} list.", newFileName, renameInfo); + logger.info("TsFile {} is successfully loaded in unsequence list.", newFileName); } catch (DiskSpaceInsufficientException e) { logger.error( "Failed to append the tsfile {} to database processor {} because the disk space is insufficient.", @@ -2437,104 +2418,6 @@ public class DataRegion implements IDataRegionForQuery { return Math.max(oldVersion, newVersion); } - private Long getTsFileResourceEstablishTime(TsFileResource tsFileResource) { - String tsFileName = tsFileResource.getTsFile().getName(); - return Long.parseLong(tsFileName.split(FILE_NAME_SEPARATOR)[0]); - } - - private LoadTsFileType getLoadingTsFileType(int insertPos, List<TsFileResource> sequenceList) { - if (insertPos == POS_OVERLAP) { - return LoadTsFileType.LOAD_UNSEQUENCE; - } - if (insertPos == sequenceList.size() - 1) { - return LoadTsFileType.LOAD_SEQUENCE; - } - long preTime = - (insertPos == -1) ? 0 : getTsFileResourceEstablishTime(sequenceList.get(insertPos)); - long subsequenceTime = getTsFileResourceEstablishTime(sequenceList.get(insertPos + 1)); - return preTime == subsequenceTime - ? LoadTsFileType.LOAD_UNSEQUENCE - : LoadTsFileType.LOAD_SEQUENCE; - } - - /** - * Find the position of "newTsFileResource" in the sequence files if it can be inserted into them. - * - * @return POS_ALREADY_EXIST(- 2) if some file has the same name as the one to be inserted - * POS_OVERLAP(-3) if some file overlaps the new file an insertion position i >= -1 if the new - * file can be inserted between [i, i+1] - */ - private int findInsertionPosition( - TsFileResource newTsFileResource, List<TsFileResource> sequenceList) { - - int insertPos = -1; - - // find the position where the new file should be inserted - for (int i = 0; i < sequenceList.size(); i++) { - TsFileResource localFile = sequenceList.get(i); - - if (!localFile.isClosed() && localFile.getProcessor() != null) { - // we cannot compare two files by TsFileResource unless they are both closed - syncCloseOneTsFileProcessor(true, localFile.getProcessor()); - } - int fileComparison = compareTsFileDevices(newTsFileResource, localFile); - switch (fileComparison) { - case 0: - // some devices are newer but some devices are older, the two files overlap in general - return POS_OVERLAP; - case -1: - // all devices in localFile are newer than the new file, the new file can be - // inserted before localFile - return i - 1; - default: - // all devices in the local file are older than the new file, proceed to the next file - insertPos = i; - } - } - return insertPos; - } - - /** - * Compare each device in the two files to find the time relation of them. - * - * @return -1 if fileA is totally older than fileB (A < B) 0 if fileA is partially older than - * fileB and partially newer than fileB (A X B) 1 if fileA is totally newer than fileB (B < A) - */ - private int compareTsFileDevices(TsFileResource fileA, TsFileResource fileB) { - boolean hasPre = false, hasSubsequence = false; - Set<String> fileADevices = fileA.getDevices(); - Set<String> fileBDevices = fileB.getDevices(); - for (String device : fileADevices) { - if (!fileBDevices.contains(device)) { - continue; - } - long startTimeA = fileA.getStartTime(device); - long endTimeA = fileA.getEndTime(device); - long startTimeB = fileB.getStartTime(device); - long endTimeB = fileB.getEndTime(device); - if (startTimeA > endTimeB) { - // A's data of the device is later than to the B's data - hasPre = true; - } else if (startTimeB > endTimeA) { - // A's data of the device is previous to the B's data - hasSubsequence = true; - } else { - // the two files overlap in the device - return 0; - } - } - if (hasPre && hasSubsequence) { - // some devices are newer but some devices are older, the two files overlap in general - return 0; - } - if (!hasPre && hasSubsequence) { - // all devices in B are newer than those in A - return -1; - } - // all devices in B are older than those in A - return 1; - } - /** * If the historical versions of a file is a sub-set of the given file's, (close and) remove it to * reduce unnecessary merge. Only used when the file sender and the receiver share the same file @@ -2614,50 +2497,6 @@ public class DataRegion implements IDataRegionForQuery { tsFileResource.remove(); } - /** - * Get an appropriate filename to ensure the order between files. The tsfile is named after - * ({systemTime}-{versionNum}-{in_space_compaction_num}-{cross_space_compaction_num}.tsfile). - * - * <p>The sorting rules for tsfile names @see {@link this#compareFileName}, we can restore the - * list based on the file name and ensure the correctness of the order, so there are three cases. - * - * <p>1. The tsfile is to be inserted in the first place of the list. Timestamp can be set to half - * of the timestamp value in the file name of the first tsfile in the list , and the version - * number will be updated to the largest number in this time partition. - * - * <p>2. The tsfile is to be inserted in the last place of the list. The file name is generated by - * the system according to the naming rules and returned. - * - * <p>3. This file is inserted between two files. The time stamp is the mean of the timestamps of - * the two files, the version number will be updated to the largest number in this time partition. - * - * @param insertIndex the new file will be inserted between the files [insertIndex, insertIndex + - * 1] - * @return appropriate filename - */ - private String getLoadingTsFileName( - LoadTsFileType tsFileType, - int insertIndex, - TsFileResource newTsFileResource, - List<TsFileResource> sequenceList) { - long timePartitionId = newTsFileResource.getTimePartition(); - if (tsFileType == LoadTsFileType.LOAD_UNSEQUENCE || insertIndex == sequenceList.size() - 1) { - return getNewTsFileName( - System.currentTimeMillis(), - getAndSetNewVersion(timePartitionId, newTsFileResource), - 0, - 0); - } - - long preTime = - (insertIndex == -1) ? 0 : getTsFileResourceEstablishTime(sequenceList.get(insertIndex)); - long subsequenceTime = getTsFileResourceEstablishTime(sequenceList.get(insertIndex + 1)); - long meanTime = preTime + ((subsequenceTime - preTime) >> 1); - - return getNewTsFileName( - meanTime, getAndSetNewVersion(timePartitionId, newTsFileResource), 0, 0); - } - private long getAndSetNewVersion(long timePartitionId, TsFileResource tsFileResource) { long version = partitionMaxFileVersions.getOrDefault(timePartitionId, 0L) + 1; partitionMaxFileVersions.put(timePartitionId, version); @@ -2681,74 +2520,38 @@ public class DataRegion implements IDataRegionForQuery { /** * Execute the loading process by the type. * - * @param type load type * @param tsFileResource tsfile resource to be loaded * @param filePartitionId the partition id of the new file * @param deleteOriginFile whether to delete the original file * @return load the file successfully @UsedBy sync module, load external tsfile module. */ - private boolean loadTsFileByType( - LoadTsFileType type, + private boolean loadTsFileToUnSequence( File tsFileToLoad, TsFileResource tsFileResource, long filePartitionId, - int insertPos, boolean deleteOriginFile) throws LoadFileException, DiskSpaceInsufficientException { File targetFile; - switch (type) { - case LOAD_UNSEQUENCE: - targetFile = - fsFactory.getFile( - DirectoryManager.getInstance().getNextFolderForUnSequenceFile(), - databaseName - + File.separatorChar - + dataRegionId - + File.separatorChar - + filePartitionId - + File.separator - + tsFileResource.getTsFile().getName()); - tsFileResource.setFile(targetFile); - if (tsFileManager.contains(tsFileResource, false)) { - logger.error("The file {} has already been loaded in unsequence list", tsFileResource); - return false; - } - tsFileManager.add(tsFileResource, false); - logger.info( - "Load tsfile in unsequence list, move file from {} to {}", - tsFileToLoad.getAbsolutePath(), - targetFile.getAbsolutePath()); - break; - case LOAD_SEQUENCE: - targetFile = - fsFactory.getFile( - DirectoryManager.getInstance().getNextFolderForSequenceFile(), - databaseName - + File.separatorChar - + dataRegionId - + File.separatorChar - + filePartitionId - + File.separator - + tsFileResource.getTsFile().getName()); - tsFileResource.setFile(targetFile); - if (tsFileManager.contains(tsFileResource, true)) { - logger.error("The file {} has already been loaded in sequence list", tsFileResource); - return false; - } - if (insertPos == -1) { - tsFileManager.insertToPartitionFileList(tsFileResource, filePartitionId, true, 0); - } else { - tsFileManager.insertToPartitionFileList( - tsFileResource, filePartitionId, true, insertPos + 1); - } - logger.info( - "Load tsfile in sequence list, move file from {} to {}", - tsFileToLoad.getAbsolutePath(), - targetFile.getAbsolutePath()); - break; - default: - throw new LoadFileException(String.format("Unsupported type of loading tsfile : %s", type)); + targetFile = + fsFactory.getFile( + DirectoryManager.getInstance().getNextFolderForUnSequenceFile(), + databaseName + + File.separatorChar + + dataRegionId + + File.separatorChar + + filePartitionId + + File.separator + + tsFileResource.getTsFile().getName()); + tsFileResource.setFile(targetFile); + if (tsFileManager.contains(tsFileResource, false)) { + logger.error("The file {} has already been loaded in unsequence list", tsFileResource); + return false; } + tsFileManager.add(tsFileResource, false); + logger.info( + "Load tsfile in unsequence list, move file from {} to {}", + tsFileToLoad.getAbsolutePath(), + targetFile.getAbsolutePath()); // move file from sync dir to data dir if (!targetFile.getParentFile().exists()) { @@ -3376,11 +3179,6 @@ public class DataRegion implements IDataRegionForQuery { this.tsFileManager.setAllowCompaction(allowCompaction); } - private enum LoadTsFileType { - LOAD_SEQUENCE, - LOAD_UNSEQUENCE - } - @FunctionalInterface public interface CloseTsFileCallBack {
