This is an automated email from the ASF dual-hosted git repository. haonan pushed a commit to branch regionLevelFileTimeIndexCache in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit a149cee15d2e0bcab78c9cc2cba0949bf6a2904f Author: HTHou <[email protected]> AuthorDate: Fri Aug 30 11:53:47 2024 +0800 Check FileTimeIndexCache to region level --- .../db/storageengine/dataregion/DataRegion.java | 73 +++++++------ .../dataregion/tsfile/TsFileManager.java | 9 +- .../dataregion/tsfile/TsFileResource.java | 12 ++- .../timeindex/FileTimeIndexCacheRecorder.java | 113 +++++++++------------ .../FileTimeIndexCacheReader.java | 10 +- 5 files changed, 99 insertions(+), 118 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java index 3953fd8d1a6..09b30bf1bb6 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java @@ -546,12 +546,24 @@ public class DataRegion implements IDataRegionForQuery { latestPartitionId, ((TreeMap<Long, List<TsFileResource>>) partitionTmpUnseqTsFiles).lastKey()); } + File logFile = SystemFileFactory.INSTANCE.getFile(dataRegionSysDir, "FileTimeIndexCache_0"); + Map<TsFileID, FileTimeIndex> fileTimeIndexMap = new HashMap<>(); + if (logFile.exists()) { + try { + FileTimeIndexCacheReader logReader = + new FileTimeIndexCacheReader(logFile, dataRegionId); + logReader.read(fileTimeIndexMap); + } catch (Exception e) { + throw new RuntimeException(e); + } + } for (Entry<Long, List<TsFileResource>> partitionFiles : partitionTmpSeqTsFiles.entrySet()) { Callable<Void> asyncRecoverTask = recoverFilesInPartition( partitionFiles.getKey(), dataRegionRecoveryContext, partitionFiles.getValue(), + fileTimeIndexMap, true); if (asyncRecoverTask != null) { asyncTsFileResourceRecoverTaskList.add(asyncRecoverTask); @@ -564,6 +576,7 @@ public class DataRegion implements IDataRegionForQuery { partitionFiles.getKey(), dataRegionRecoveryContext, partitionFiles.getValue(), + fileTimeIndexMap, false); if (asyncRecoverTask != null) { asyncTsFileResourceRecoverTaskList.add(asyncRecoverTask); @@ -870,45 +883,29 @@ public class DataRegion implements IDataRegionForQuery { long partitionId, DataRegionRecoveryContext context, List<TsFileResource> resourceList, + Map<TsFileID, FileTimeIndex> fileTimeIndexMap, boolean isSeq) { - - File partitionSysDir = - SystemFileFactory.INSTANCE.getFile(dataRegionSysDir, String.valueOf(partitionId)); - File logFile = SystemFileFactory.INSTANCE.getFile(partitionSysDir, "FileTimeIndexCache_0"); - if (logFile.exists()) { - Map<TsFileID, FileTimeIndex> fileTimeIndexMap; - try { - FileTimeIndexCacheReader logReader = - new FileTimeIndexCacheReader(logFile, dataRegionId, partitionId); - fileTimeIndexMap = logReader.read(); - } catch (Exception e) { - throw new RuntimeException(e); - } - List<TsFileResource> resourceListForAsyncRecover = new ArrayList<>(); - List<TsFileResource> resourceListForSyncRecover = new ArrayList<>(); - Callable<Void> asyncRecoverTask = null; - for (TsFileResource tsFileResource : resourceList) { - if (fileTimeIndexMap.containsKey(tsFileResource.getTsFileID())) { - tsFileResource.setTimeIndex(fileTimeIndexMap.get(tsFileResource.getTsFileID())); - tsFileResource.setStatus(TsFileResourceStatus.NORMAL); - tsFileManager.add(tsFileResource, isSeq); - resourceListForAsyncRecover.add(tsFileResource); - } else { - resourceListForSyncRecover.add(tsFileResource); - } - } - if (!resourceListForAsyncRecover.isEmpty()) { - asyncRecoverTask = - asyncRecoverFilesInPartition(partitionId, context, resourceListForAsyncRecover, isSeq); - } - if (!resourceListForSyncRecover.isEmpty()) { - syncRecoverFilesInPartition(partitionId, context, resourceListForSyncRecover, isSeq); + List<TsFileResource> resourceListForAsyncRecover = new ArrayList<>(); + List<TsFileResource> resourceListForSyncRecover = new ArrayList<>(); + Callable<Void> asyncRecoverTask = null; + for (TsFileResource tsFileResource : resourceList) { + if (fileTimeIndexMap.containsKey(tsFileResource.getTsFileID())) { + tsFileResource.setTimeIndex(fileTimeIndexMap.get(tsFileResource.getTsFileID())); + tsFileResource.setStatus(TsFileResourceStatus.NORMAL); + tsFileManager.add(tsFileResource, isSeq); + resourceListForAsyncRecover.add(tsFileResource); + } else { + resourceListForSyncRecover.add(tsFileResource); } - return asyncRecoverTask; - } else { - syncRecoverFilesInPartition(partitionId, context, resourceList, isSeq); - return null; } + if (!resourceListForAsyncRecover.isEmpty()) { + asyncRecoverTask = + asyncRecoverFilesInPartition(partitionId, context, resourceListForAsyncRecover, isSeq); + } + if (!resourceListForSyncRecover.isEmpty()) { + syncRecoverFilesInPartition(partitionId, context, resourceListForSyncRecover, isSeq); + } + return asyncRecoverTask; } private Callable<Void> asyncRecoverFilesInPartition( @@ -3919,9 +3916,7 @@ public class DataRegion implements IDataRegionForQuery { } public void compactFileTimeIndexCache() { - for (long timePartition : partitionMaxFileVersions.keySet()) { - tsFileManager.compactFileTimeIndexCache(timePartition); - } + tsFileManager.compactFileTimeIndexCache(); } @TestOnly diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileManager.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileManager.java index 06409cf3cf8..50aee584f9c 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileManager.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileManager.java @@ -392,16 +392,17 @@ public class TsFileManager { && unsequenceFiles.higherKey(timePartitionId) == null); } - public void compactFileTimeIndexCache(long timePartition) { + public void compactFileTimeIndexCache() { + int currentResourceSize = size(true) + size(false); readLock(); try { FileTimeIndexCacheRecorder.getInstance() .compactFileTimeIndexIfNeeded( storageGroupName, Integer.parseInt(dataRegionId), - timePartition, - sequenceFiles.get(timePartition), - unsequenceFiles.get(timePartition)); + currentResourceSize, + sequenceFiles, + unsequenceFiles); } finally { readUnlock(); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileResource.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileResource.java index 97dcada9984..0ee97fd2a63 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileResource.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileResource.java @@ -296,12 +296,18 @@ public class TsFileResource { } public static int getFileTimeIndexSerializedSize() { - // 5 * 8 Byte means 5 long numbers of tsFileID.timestamp, tsFileID.fileVersion - // tsFileID.compactionVersion, timeIndex.getMinStartTime(), timeIndex.getMaxStartTime() - return 5 * Long.BYTES; + // 6 * 8 Byte means 6 long numbers of + // tsFileID.timePartitionId, + // tsFileID.timestamp, + // tsFileID.fileVersion, + // tsFileID.compactionVersion, + // timeIndex.getMinStartTime(), + // timeIndex.getMaxStartTime() + return 6 * Long.BYTES; } public void serializeFileTimeIndexToByteBuffer(ByteBuffer buffer) { + buffer.putLong(tsFileID.timePartitionId); buffer.putLong(tsFileID.timestamp); buffer.putLong(tsFileID.fileVersion); buffer.putLong(tsFileID.compactionVersion); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/timeindex/FileTimeIndexCacheRecorder.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/timeindex/FileTimeIndexCacheRecorder.java index b0c805bba6b..d9f182e8a29 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/timeindex/FileTimeIndexCacheRecorder.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/timeindex/FileTimeIndexCacheRecorder.java @@ -57,8 +57,7 @@ public class FileTimeIndexCacheRecorder { private final BlockingQueue<Runnable> taskQueue = new LinkedBlockingQueue<>(); - private final Map<Integer, Map<Long, FileTimeIndexCacheWriter>> writerMap = - new ConcurrentHashMap<>(); + private final Map<Integer, FileTimeIndexCacheWriter> writerMap = new ConcurrentHashMap<>(); private FileTimeIndexCacheRecorder() { recordFileIndexThread = @@ -86,11 +85,10 @@ public class FileTimeIndexCacheRecorder { TsFileResource firstResource = tsFileResources[0]; TsFileID tsFileID = firstResource.getTsFileID(); int dataRegionId = tsFileID.regionId; - long partitionId = tsFileID.timePartitionId; File dataRegionSysDir = StorageEngine.getDataRegionSystemDir( firstResource.getDatabaseName(), firstResource.getDataRegionId()); - FileTimeIndexCacheWriter writer = getWriter(dataRegionId, partitionId, dataRegionSysDir); + FileTimeIndexCacheWriter writer = getWriter(dataRegionId, dataRegionSysDir); boolean result = taskQueue.offer( () -> { @@ -116,18 +114,14 @@ public class FileTimeIndexCacheRecorder { public void compactFileTimeIndexIfNeeded( String dataBaseName, int dataRegionId, - long partitionId, - TsFileResourceList sequenceFiles, - TsFileResourceList unsequenceFiles) { + int currentResourceCount, + Map<Long, TsFileResourceList> sequenceFiles, + Map<Long, TsFileResourceList> unsequenceFiles) { FileTimeIndexCacheWriter writer = getWriter( dataRegionId, - partitionId, StorageEngine.getDataRegionSystemDir(dataBaseName, String.valueOf(dataRegionId))); - int currentResourceCount = - (sequenceFiles == null ? 0 : sequenceFiles.size()) - + (unsequenceFiles == null ? 0 : unsequenceFiles.size()); if (writer.getLogFile().length() > currentResourceCount * getFileTimeIndexSerializedSize() * 100L) { @@ -136,25 +130,29 @@ public class FileTimeIndexCacheRecorder { () -> { try { writer.clearFile(); - if (sequenceFiles != null && !sequenceFiles.isEmpty()) { - ByteBuffer buffer = - ByteBuffer.allocate( - getFileTimeIndexSerializedSize() * sequenceFiles.size()); - for (TsFileResource tsFileResource : sequenceFiles) { - tsFileResource.serializeFileTimeIndexToByteBuffer(buffer); + for (TsFileResourceList sequenceList : sequenceFiles.values()) { + if (sequenceList != null && !sequenceList.isEmpty()) { + ByteBuffer buffer = + ByteBuffer.allocate( + getFileTimeIndexSerializedSize() * sequenceList.size()); + for (TsFileResource tsFileResource : sequenceList) { + tsFileResource.serializeFileTimeIndexToByteBuffer(buffer); + } + buffer.flip(); + writer.write(buffer); } - buffer.flip(); - writer.write(buffer); } - if (unsequenceFiles != null && !unsequenceFiles.isEmpty()) { - ByteBuffer buffer = - ByteBuffer.allocate( - getFileTimeIndexSerializedSize() * unsequenceFiles.size()); - for (TsFileResource tsFileResource : unsequenceFiles) { - tsFileResource.serializeFileTimeIndexToByteBuffer(buffer); + for (TsFileResourceList unsequenceList : unsequenceFiles.values()) { + if (unsequenceList != null && !unsequenceList.isEmpty()) { + ByteBuffer buffer = + ByteBuffer.allocate( + getFileTimeIndexSerializedSize() * unsequenceList.size()); + for (TsFileResource tsFileResource : unsequenceList) { + tsFileResource.serializeFileTimeIndexToByteBuffer(buffer); + } + buffer.flip(); + writer.write(buffer); } - buffer.flip(); - writer.write(buffer); } } catch (IOException e) { LOGGER.warn("Meet error when compact FileTimeIndexCache: {}", e.getMessage()); @@ -166,51 +164,36 @@ public class FileTimeIndexCacheRecorder { } } - private FileTimeIndexCacheWriter getWriter( - int dataRegionId, long partitionId, File dataRegionSysDir) { - return writerMap - .computeIfAbsent(dataRegionId, k -> new ConcurrentHashMap<>()) - .computeIfAbsent( - partitionId, - k -> { - File partitionDir = - SystemFileFactory.INSTANCE.getFile(dataRegionSysDir, String.valueOf(partitionId)); - File logFile = SystemFileFactory.INSTANCE.getFile(partitionDir, FILE_NAME); - try { - if (!partitionDir.exists() && !partitionDir.mkdirs()) { - LOGGER.debug( - "Partition directory has existed,filePath:{}", - partitionDir.getAbsolutePath()); - } - if (!logFile.createNewFile()) { - LOGGER.debug( - "Partition log file has existed,filePath:{}", logFile.getAbsolutePath()); - } - return new FileTimeIndexCacheWriter(logFile, true); - } catch (IOException e) { - throw new RuntimeException(e); - } - }); + private FileTimeIndexCacheWriter getWriter(int dataRegionId, File dataRegionSysDir) { + return writerMap.computeIfAbsent( + dataRegionId, + k -> { + File logFile = SystemFileFactory.INSTANCE.getFile(dataRegionSysDir, FILE_NAME); + try { + if (!logFile.createNewFile()) { + LOGGER.debug( + "FileTimeIndex log file has existed,filePath:{}", logFile.getAbsolutePath()); + } + return new FileTimeIndexCacheWriter(logFile, true); + } catch (IOException e) { + throw new RuntimeException(e); + } + }); } public void close() throws IOException { - for (Map<Long, FileTimeIndexCacheWriter> partitionWriterMap : writerMap.values()) { - for (FileTimeIndexCacheWriter writer : partitionWriterMap.values()) { - writer.close(); - } + for (FileTimeIndexCacheWriter writer : writerMap.values()) { + writer.close(); } } public void removeFileTimeIndexCache(int dataRegionId) { - Map<Long, FileTimeIndexCacheWriter> partitionWriterMap = writerMap.get(dataRegionId); - if (partitionWriterMap != null) { - for (FileTimeIndexCacheWriter writer : partitionWriterMap.values()) { - try { - writer.close(); - deleteDirectoryAndEmptyParent(writer.getLogFile()); - } catch (IOException e) { - LOGGER.warn("Meet error when close FileTimeIndexCache: {}", e.getMessage()); - } + for (FileTimeIndexCacheWriter writer : writerMap.values()) { + try { + writer.close(); + deleteDirectoryAndEmptyParent(writer.getLogFile()); + } catch (IOException e) { + LOGGER.warn("Meet error when close FileTimeIndexCache: {}", e.getMessage()); } } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/utils/fileTimeIndexCache/FileTimeIndexCacheReader.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/utils/fileTimeIndexCache/FileTimeIndexCacheReader.java index 35682bd8020..d07210736e6 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/utils/fileTimeIndexCache/FileTimeIndexCacheReader.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/utils/fileTimeIndexCache/FileTimeIndexCacheReader.java @@ -32,7 +32,6 @@ import java.io.IOException; import java.nio.channels.FileChannel; import java.nio.file.Files; import java.nio.file.StandardOpenOption; -import java.util.HashMap; import java.util.Map; import static org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource.getFileTimeIndexSerializedSize; @@ -44,21 +43,19 @@ public class FileTimeIndexCacheReader { private final File logFile; private final long fileLength; private final int dataRegionId; - private final long partitionId; - public FileTimeIndexCacheReader(File logFile, String dataRegionId, long partitionId) { + public FileTimeIndexCacheReader(File logFile, String dataRegionId) { this.logFile = logFile; this.fileLength = logFile.length(); this.dataRegionId = Integer.parseInt(dataRegionId); - this.partitionId = partitionId; } - public Map<TsFileID, FileTimeIndex> read() throws IOException { - Map<TsFileID, FileTimeIndex> fileTimeIndexMap = new HashMap<>(); + public void read(Map<TsFileID, FileTimeIndex> fileTimeIndexMap) throws IOException { long readLength = 0L; try (DataInputStream logStream = new DataInputStream(new BufferedInputStream(Files.newInputStream(logFile.toPath())))) { while (readLength < fileLength) { + long partitionId = logStream.readLong(); long timestamp = logStream.readLong(); long fileVersion = logStream.readLong(); long compactionVersion = logStream.readLong(); @@ -79,6 +76,5 @@ public class FileTimeIndexCacheReader { channel.truncate(readLength); } } - return fileTimeIndexMap; } }
