This is an automated email from the ASF dual-hosted git repository. haonan pushed a commit to branch speed_up_recover in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit f8070e3318a021311fd846f074491ec5c22fcd2d Author: HTHou <[email protected]> AuthorDate: Fri Jul 26 16:35:41 2024 +0800 dev --- .../db/storageengine/dataregion/DataRegion.java | 38 +++++++++++++++++++--- .../tsfile/timeindex/PartitionLogRecorder.java | 15 +++++---- .../db/utils/writelog/PartitionLogReader.java | 22 ++++++++++--- .../db/utils/writelog/PartitionLogWriter.java | 5 +-- 4 files changed, 62 insertions(+), 18 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java index 5afc6c6faa4..b34be26882f 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java @@ -19,7 +19,6 @@ package org.apache.iotdb.db.storageengine.dataregion; -import java.io.FileNotFoundException; import org.apache.iotdb.common.rpc.thrift.TSStatus; import org.apache.iotdb.commons.cluster.NodeStatus; import org.apache.iotdb.commons.conf.CommonDescriptor; @@ -91,11 +90,14 @@ import org.apache.iotdb.db.storageengine.dataregion.read.QueryDataSourceForRegio import org.apache.iotdb.db.storageengine.dataregion.read.control.FileReaderManager; import org.apache.iotdb.db.storageengine.dataregion.read.filescan.IFileScanHandle; import org.apache.iotdb.db.storageengine.dataregion.read.filescan.impl.ClosedFileScanHandleImpl; +import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileID; import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileManager; import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource; import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResourceStatus; import org.apache.iotdb.db.storageengine.dataregion.tsfile.generator.TsFileNameGenerator; import org.apache.iotdb.db.storageengine.dataregion.tsfile.generator.VersionController; +import org.apache.iotdb.db.storageengine.dataregion.tsfile.timeindex.FileTimeIndex; +import org.apache.iotdb.db.storageengine.dataregion.tsfile.timeindex.PartitionLogRecorder; import org.apache.iotdb.db.storageengine.dataregion.utils.validate.TsFileValidator; import org.apache.iotdb.db.storageengine.dataregion.wal.WALManager; import org.apache.iotdb.db.storageengine.dataregion.wal.node.IWALNode; @@ -819,19 +821,45 @@ public class DataRegion implements IDataRegionForQuery { boolean isSeq) { // TODO: read partition file - File logFile = SystemFileFactory.INSTANCE.getFile(dataRegionSysDir, String.valueOf(partitionId)); + File logFile = + SystemFileFactory.INSTANCE.getFile(dataRegionSysDir, String.valueOf(partitionId)); if (logFile.exists()) { - try { - PartitionLogReader logReader = new PartitionLogReader(logFile); - } catch (IOException e) { + Map<TsFileID, FileTimeIndex> fileTimeIndexMap = new HashMap<>(); + try (PartitionLogReader logReader = new PartitionLogReader(logFile)) { + fileTimeIndexMap = logReader.read(); + } catch (Exception e) { throw new RuntimeException(e); } + for (TsFileResource tsFileResource : resourceList) { + FileTimeIndex fileTimeIndex = fileTimeIndexMap.get(tsFileResource.getTsFileID()); + if (fileTimeIndex == null) { + continue; + } + tsFileResource.setTimeIndex(fileTimeIndex); + } + } else { + syncRecoverFilesInPartition(partitionId, context, resourceList, isSeq); } + } + private void asyncRecoverFilesInPartition( + long partitionId, + DataRegionRecoveryContext context, + List<TsFileResource> resourceList, + boolean isSeq) { + // TODO: read partition file + } + private void syncRecoverFilesInPartition( + long partitionId, + DataRegionRecoveryContext context, + List<TsFileResource> resourceList, + boolean isSeq) { for (TsFileResource tsFileResource : resourceList) { recoverSealedTsFiles(tsFileResource, context, isSeq); + PartitionLogRecorder.getInstance() + .submitTask(dataRegionSysDir, tsFileResource); } if (config.isEnableSeparateData()) { if (!lastFlushTimeMap.checkAndCreateFlushedTimePartition(partitionId)) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/timeindex/PartitionLogRecorder.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/timeindex/PartitionLogRecorder.java index 9ba45ca2f15..b610d639ba8 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/timeindex/PartitionLogRecorder.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/timeindex/PartitionLogRecorder.java @@ -25,7 +25,10 @@ import org.apache.iotdb.commons.concurrent.threadpool.ScheduledExecutorUtil; import org.apache.iotdb.commons.file.SystemFileFactory; import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileID; import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource; -import org.apache.iotdb.db.utils.writelog.LogWriter; +import org.apache.iotdb.db.utils.writelog.PartitionLogWriter; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.File; import java.io.IOException; @@ -35,9 +38,6 @@ import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; -import org.apache.iotdb.db.utils.writelog.PartitionLogWriter; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; public class PartitionLogRecorder { @@ -76,11 +76,12 @@ public class PartitionLogRecorder { partitionId, k -> { try { - File logFile = SystemFileFactory.INSTANCE.getFile(dataRegionSysDir, String.valueOf(partitionId)); + File logFile = + SystemFileFactory.INSTANCE.getFile( + dataRegionSysDir, String.valueOf(partitionId)); if (!logFile.createNewFile()) { LOGGER.warn( - "Partition log file has existed,filePath:{}", - logFile.getAbsolutePath()); + "Partition log file has existed,filePath:{}", logFile.getAbsolutePath()); } return new PartitionLogWriter(logFile, false); } catch (IOException e) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/writelog/PartitionLogReader.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/writelog/PartitionLogReader.java index 83925cc69f2..c76d0076745 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/writelog/PartitionLogReader.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/writelog/PartitionLogReader.java @@ -22,12 +22,14 @@ package org.apache.iotdb.db.utils.writelog; import java.io.BufferedInputStream; import java.io.DataInputStream; import java.io.File; -import java.io.FileInputStream; -import java.io.FileNotFoundException; import java.io.IOException; import java.nio.file.Files; +import java.util.Map; +import org.apache.iotdb.db.storageengine.dataregion.memtable.TsFileProcessor; +import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileID; +import org.apache.iotdb.db.storageengine.dataregion.tsfile.timeindex.FileTimeIndex; -public class PartitionLogReader { +public class PartitionLogReader implements AutoCloseable { private DataInputStream logStream; private String filepath; @@ -35,7 +37,19 @@ public class PartitionLogReader { private byte[] buffer; public PartitionLogReader(File logFile) throws IOException { - logStream = new DataInputStream(new BufferedInputStream(Files.newInputStream(logFile.toPath()))); + logStream = + new DataInputStream(new BufferedInputStream(Files.newInputStream(logFile.toPath()))); this.filepath = logFile.getPath(); } + + public Map<TsFileID, FileTimeIndex> read() throws IOException { + // read the log file + // return the TsFileProcessor map + return null; + } + + @Override + public void close() throws Exception { + logStream.close(); + } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/writelog/PartitionLogWriter.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/writelog/PartitionLogWriter.java index 35213626525..af0179d3847 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/writelog/PartitionLogWriter.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/writelog/PartitionLogWriter.java @@ -18,6 +18,9 @@ */ package org.apache.iotdb.db.utils.writelog; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import java.io.File; import java.io.FileNotFoundException; import java.io.FileOutputStream; @@ -25,8 +28,6 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.nio.channels.ClosedChannelException; import java.nio.channels.FileChannel; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; /** * LogWriter writes the binary logs into a file using FileChannel together with check sums of each
