This is an automated email from the ASF dual-hosted git repository. haonan pushed a commit to branch speed_up_recover in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 41ed3907ecf168ba41ef2eac7ec39263b7eb104f Author: HTHou <[email protected]> AuthorDate: Fri Jul 26 18:59:03 2024 +0800 dev reader --- .../db/storageengine/dataregion/DataRegion.java | 8 ++-- .../db/utils/writelog/PartitionLogReader.java | 48 +++++++++++++--------- 2 files changed, 32 insertions(+), 24 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java index b34be26882f..26d33a7b13f 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java @@ -825,8 +825,9 @@ public class DataRegion implements IDataRegionForQuery { SystemFileFactory.INSTANCE.getFile(dataRegionSysDir, String.valueOf(partitionId)); if (logFile.exists()) { Map<TsFileID, FileTimeIndex> fileTimeIndexMap = new HashMap<>(); - try (PartitionLogReader logReader = new PartitionLogReader(logFile)) { - fileTimeIndexMap = logReader.read(); + try { + PartitionLogReader logReader = new PartitionLogReader(logFile, dataRegionId, partitionId); + logReader.read(fileTimeIndexMap); } catch (Exception e) { throw new RuntimeException(e); } @@ -858,8 +859,7 @@ public class DataRegion implements IDataRegionForQuery { boolean isSeq) { for (TsFileResource tsFileResource : resourceList) { recoverSealedTsFiles(tsFileResource, context, isSeq); - PartitionLogRecorder.getInstance() - .submitTask(dataRegionSysDir, tsFileResource); + PartitionLogRecorder.getInstance().submitTask(dataRegionSysDir, tsFileResource); } if (config.isEnableSeparateData()) { if (!lastFlushTimeMap.checkAndCreateFlushedTimePartition(partitionId)) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/writelog/PartitionLogReader.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/writelog/PartitionLogReader.java index c76d0076745..77682c8b053 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/writelog/PartitionLogReader.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/writelog/PartitionLogReader.java @@ -19,37 +19,45 @@ package org.apache.iotdb.db.utils.writelog; +import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileID; +import org.apache.iotdb.db.storageengine.dataregion.tsfile.timeindex.FileTimeIndex; + import java.io.BufferedInputStream; import java.io.DataInputStream; import java.io.File; import java.io.IOException; import java.nio.file.Files; import java.util.Map; -import org.apache.iotdb.db.storageengine.dataregion.memtable.TsFileProcessor; -import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileID; -import org.apache.iotdb.db.storageengine.dataregion.tsfile.timeindex.FileTimeIndex; -public class PartitionLogReader implements AutoCloseable { +public class PartitionLogReader { - private DataInputStream logStream; - private String filepath; + private final File logFile; + private final long fileLength; + private final int dataRegionId; + private final long partitionId; - private byte[] buffer; - - public PartitionLogReader(File logFile) throws IOException { - logStream = - new DataInputStream(new BufferedInputStream(Files.newInputStream(logFile.toPath()))); - this.filepath = logFile.getPath(); + public PartitionLogReader(File logFile, String dataRegionId, long partitionId) + throws IOException { + this.logFile = logFile; + this.fileLength = logFile.length(); + this.dataRegionId = Integer.parseInt(dataRegionId); + this.partitionId = partitionId; } - public Map<TsFileID, FileTimeIndex> read() throws IOException { - // read the log file - // return the TsFileProcessor map - return null; - } - - @Override - public void close() throws Exception { + public void read(Map<TsFileID, FileTimeIndex> fileTimeIndexMap) throws IOException { + DataInputStream logStream = + new DataInputStream(new BufferedInputStream(Files.newInputStream(logFile.toPath()))); + long readLength = 0L; + while (readLength < fileLength) { + long fileVersion = logStream.readLong(); + long compactionVersion = logStream.readLong(); + long minStartTime = logStream.readLong(); + long maxEndTime = logStream.readLong(); + TsFileID tsFileID = new TsFileID(dataRegionId, partitionId, fileVersion, compactionVersion); + FileTimeIndex fileTimeIndex = new FileTimeIndex(minStartTime, maxEndTime); + fileTimeIndexMap.put(tsFileID, fileTimeIndex); + readLength += 4 * Long.BYTES; + } logStream.close(); } }
