This is an automated email from the ASF dual-hosted git repository.
jiangtian pushed a commit to branch rc/1.3.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/rc/1.3.3 by this push:
new a492c687b4f Change FileTimeIndexCache to region level (#13353) (#13409)
a492c687b4f is described below
commit a492c687b4f7b1864fbbe244dbecece3b0a065d5
Author: Haonan <[email protected]>
AuthorDate: Thu Sep 5 12:19:20 2024 +0800
Change FileTimeIndexCache to region level (#13353) (#13409)
* Check FileTimeIndexCache to region level
* add log
* fix ut
* fix it error
* fix error log
---
.../iotdb/db/storageengine/StorageEngine.java | 4 +
.../db/storageengine/dataregion/DataRegion.java | 73 ++++++-------
.../dataregion/tsfile/TsFileManager.java | 9 +-
.../dataregion/tsfile/TsFileResource.java | 12 ++-
.../timeindex/FileTimeIndexCacheRecorder.java | 118 +++++++++------------
.../FileTimeIndexCacheReader.java | 10 +-
.../FileTimeIndexCacheWriter.java | 4 +
7 files changed, 112 insertions(+), 118 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/StorageEngine.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/StorageEngine.java
index 224a20dc196..f9bcb43be60 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/StorageEngine.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/StorageEngine.java
@@ -355,6 +355,7 @@ public class StorageEngine implements IService {
private void asyncRecoverTsFileResource() {
List<Future<Void>> futures = new LinkedList<>();
+ long startRecoverTime = System.currentTimeMillis();
for (DataRegion dataRegion : dataRegionMap.values()) {
if (dataRegion != null) {
List<Callable<Void>> asyncTsFileResourceRecoverTasks =
@@ -378,6 +379,9 @@ public class StorageEngine implements IService {
checkResults(futures, "async recover tsfile resource meets
error.");
recoverRepairData();
isReadyForNonReadWriteFunctions.set(true);
+ LOGGER.info(
+ "TsFile Resource recover cost: {}s.",
+ (System.currentTimeMillis() - startRecoverTime) / 1000);
},
ThreadName.STORAGE_ENGINE_RECOVER_TRIGGER.getName());
recoverEndTrigger.start();
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
index 232e1dd09e9..6a8a339ccc3 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
@@ -538,12 +538,24 @@ public class DataRegion implements IDataRegionForQuery {
latestPartitionId,
((TreeMap<Long, List<TsFileResource>>)
partitionTmpUnseqTsFiles).lastKey());
}
+ File logFile = SystemFileFactory.INSTANCE.getFile(dataRegionSysDir,
"FileTimeIndexCache_0");
+ Map<TsFileID, FileTimeIndex> fileTimeIndexMap = new HashMap<>();
+ if (logFile.exists()) {
+ try {
+ FileTimeIndexCacheReader logReader =
+ new FileTimeIndexCacheReader(logFile, dataRegionId);
+ logReader.read(fileTimeIndexMap);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
for (Entry<Long, List<TsFileResource>> partitionFiles :
partitionTmpSeqTsFiles.entrySet()) {
Callable<Void> asyncRecoverTask =
recoverFilesInPartition(
partitionFiles.getKey(),
dataRegionRecoveryContext,
partitionFiles.getValue(),
+ fileTimeIndexMap,
true);
if (asyncRecoverTask != null) {
asyncTsFileResourceRecoverTaskList.add(asyncRecoverTask);
@@ -556,6 +568,7 @@ public class DataRegion implements IDataRegionForQuery {
partitionFiles.getKey(),
dataRegionRecoveryContext,
partitionFiles.getValue(),
+ fileTimeIndexMap,
false);
if (asyncRecoverTask != null) {
asyncTsFileResourceRecoverTaskList.add(asyncRecoverTask);
@@ -862,45 +875,29 @@ public class DataRegion implements IDataRegionForQuery {
long partitionId,
DataRegionRecoveryContext context,
List<TsFileResource> resourceList,
+ Map<TsFileID, FileTimeIndex> fileTimeIndexMap,
boolean isSeq) {
-
- File partitionSysDir =
- SystemFileFactory.INSTANCE.getFile(dataRegionSysDir,
String.valueOf(partitionId));
- File logFile = SystemFileFactory.INSTANCE.getFile(partitionSysDir,
"FileTimeIndexCache_0");
- if (logFile.exists()) {
- Map<TsFileID, FileTimeIndex> fileTimeIndexMap;
- try {
- FileTimeIndexCacheReader logReader =
- new FileTimeIndexCacheReader(logFile, dataRegionId, partitionId);
- fileTimeIndexMap = logReader.read();
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- List<TsFileResource> resourceListForAsyncRecover = new ArrayList<>();
- List<TsFileResource> resourceListForSyncRecover = new ArrayList<>();
- Callable<Void> asyncRecoverTask = null;
- for (TsFileResource tsFileResource : resourceList) {
- if (fileTimeIndexMap.containsKey(tsFileResource.getTsFileID())) {
-
tsFileResource.setTimeIndex(fileTimeIndexMap.get(tsFileResource.getTsFileID()));
- tsFileResource.setStatus(TsFileResourceStatus.NORMAL);
- tsFileManager.add(tsFileResource, isSeq);
- resourceListForAsyncRecover.add(tsFileResource);
- } else {
- resourceListForSyncRecover.add(tsFileResource);
- }
- }
- if (!resourceListForAsyncRecover.isEmpty()) {
- asyncRecoverTask =
- asyncRecoverFilesInPartition(partitionId, context,
resourceListForAsyncRecover, isSeq);
- }
- if (!resourceListForSyncRecover.isEmpty()) {
- syncRecoverFilesInPartition(partitionId, context,
resourceListForSyncRecover, isSeq);
+ List<TsFileResource> resourceListForAsyncRecover = new ArrayList<>();
+ List<TsFileResource> resourceListForSyncRecover = new ArrayList<>();
+ Callable<Void> asyncRecoverTask = null;
+ for (TsFileResource tsFileResource : resourceList) {
+ if (fileTimeIndexMap.containsKey(tsFileResource.getTsFileID())) {
+
tsFileResource.setTimeIndex(fileTimeIndexMap.get(tsFileResource.getTsFileID()));
+ tsFileResource.setStatus(TsFileResourceStatus.NORMAL);
+ tsFileManager.add(tsFileResource, isSeq);
+ resourceListForAsyncRecover.add(tsFileResource);
+ } else {
+ resourceListForSyncRecover.add(tsFileResource);
}
- return asyncRecoverTask;
- } else {
- syncRecoverFilesInPartition(partitionId, context, resourceList, isSeq);
- return null;
}
+ if (!resourceListForAsyncRecover.isEmpty()) {
+ asyncRecoverTask =
+ asyncRecoverFilesInPartition(partitionId, context,
resourceListForAsyncRecover, isSeq);
+ }
+ if (!resourceListForSyncRecover.isEmpty()) {
+ syncRecoverFilesInPartition(partitionId, context,
resourceListForSyncRecover, isSeq);
+ }
+ return asyncRecoverTask;
}
private Callable<Void> asyncRecoverFilesInPartition(
@@ -3780,9 +3777,7 @@ public class DataRegion implements IDataRegionForQuery {
}
public void compactFileTimeIndexCache() {
- for (long timePartition : partitionMaxFileVersions.keySet()) {
- tsFileManager.compactFileTimeIndexCache(timePartition);
- }
+ tsFileManager.compactFileTimeIndexCache();
}
@TestOnly
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileManager.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileManager.java
index 06409cf3cf8..50aee584f9c 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileManager.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileManager.java
@@ -392,16 +392,17 @@ public class TsFileManager {
&& unsequenceFiles.higherKey(timePartitionId) == null);
}
- public void compactFileTimeIndexCache(long timePartition) {
+ public void compactFileTimeIndexCache() {
+ int currentResourceSize = size(true) + size(false);
readLock();
try {
FileTimeIndexCacheRecorder.getInstance()
.compactFileTimeIndexIfNeeded(
storageGroupName,
Integer.parseInt(dataRegionId),
- timePartition,
- sequenceFiles.get(timePartition),
- unsequenceFiles.get(timePartition));
+ currentResourceSize,
+ sequenceFiles,
+ unsequenceFiles);
} finally {
readUnlock();
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileResource.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileResource.java
index bc24dec1049..b7973cc03df 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileResource.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileResource.java
@@ -294,12 +294,18 @@ public class TsFileResource {
}
public static int getFileTimeIndexSerializedSize() {
- // 5 * 8 Byte means 5 long numbers of tsFileID.timestamp,
tsFileID.fileVersion
- // tsFileID.compactionVersion, timeIndex.getMinStartTime(),
timeIndex.getMaxStartTime()
- return 5 * Long.BYTES;
+ // 6 * 8 Byte means 6 long numbers of
+ // tsFileID.timePartitionId,
+ // tsFileID.timestamp,
+ // tsFileID.fileVersion,
+ // tsFileID.compactionVersion,
+ // timeIndex.getMinStartTime(),
+ // timeIndex.getMaxStartTime()
+ return 6 * Long.BYTES;
}
public void serializeFileTimeIndexToByteBuffer(ByteBuffer buffer) {
+ buffer.putLong(tsFileID.timePartitionId);
buffer.putLong(tsFileID.timestamp);
buffer.putLong(tsFileID.fileVersion);
buffer.putLong(tsFileID.compactionVersion);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/timeindex/FileTimeIndexCacheRecorder.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/timeindex/FileTimeIndexCacheRecorder.java
index b0c805bba6b..d6eb597922e 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/timeindex/FileTimeIndexCacheRecorder.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/timeindex/FileTimeIndexCacheRecorder.java
@@ -57,8 +57,7 @@ public class FileTimeIndexCacheRecorder {
private final BlockingQueue<Runnable> taskQueue = new
LinkedBlockingQueue<>();
- private final Map<Integer, Map<Long, FileTimeIndexCacheWriter>> writerMap =
- new ConcurrentHashMap<>();
+ private final Map<Integer, FileTimeIndexCacheWriter> writerMap = new
ConcurrentHashMap<>();
private FileTimeIndexCacheRecorder() {
recordFileIndexThread =
@@ -86,11 +85,10 @@ public class FileTimeIndexCacheRecorder {
TsFileResource firstResource = tsFileResources[0];
TsFileID tsFileID = firstResource.getTsFileID();
int dataRegionId = tsFileID.regionId;
- long partitionId = tsFileID.timePartitionId;
File dataRegionSysDir =
StorageEngine.getDataRegionSystemDir(
firstResource.getDatabaseName(),
firstResource.getDataRegionId());
- FileTimeIndexCacheWriter writer = getWriter(dataRegionId, partitionId,
dataRegionSysDir);
+ FileTimeIndexCacheWriter writer = getWriter(dataRegionId,
dataRegionSysDir);
boolean result =
taskQueue.offer(
() -> {
@@ -116,18 +114,14 @@ public class FileTimeIndexCacheRecorder {
public void compactFileTimeIndexIfNeeded(
String dataBaseName,
int dataRegionId,
- long partitionId,
- TsFileResourceList sequenceFiles,
- TsFileResourceList unsequenceFiles) {
+ int currentResourceCount,
+ Map<Long, TsFileResourceList> sequenceFiles,
+ Map<Long, TsFileResourceList> unsequenceFiles) {
FileTimeIndexCacheWriter writer =
getWriter(
dataRegionId,
- partitionId,
StorageEngine.getDataRegionSystemDir(dataBaseName,
String.valueOf(dataRegionId)));
- int currentResourceCount =
- (sequenceFiles == null ? 0 : sequenceFiles.size())
- + (unsequenceFiles == null ? 0 : unsequenceFiles.size());
if (writer.getLogFile().length()
> currentResourceCount * getFileTimeIndexSerializedSize() * 100L) {
@@ -136,25 +130,29 @@ public class FileTimeIndexCacheRecorder {
() -> {
try {
writer.clearFile();
- if (sequenceFiles != null && !sequenceFiles.isEmpty()) {
- ByteBuffer buffer =
- ByteBuffer.allocate(
- getFileTimeIndexSerializedSize() *
sequenceFiles.size());
- for (TsFileResource tsFileResource : sequenceFiles) {
-
tsFileResource.serializeFileTimeIndexToByteBuffer(buffer);
+ for (TsFileResourceList sequenceList :
sequenceFiles.values()) {
+ if (sequenceList != null && !sequenceList.isEmpty()) {
+ ByteBuffer buffer =
+ ByteBuffer.allocate(
+ getFileTimeIndexSerializedSize() *
sequenceList.size());
+ for (TsFileResource tsFileResource : sequenceList) {
+
tsFileResource.serializeFileTimeIndexToByteBuffer(buffer);
+ }
+ buffer.flip();
+ writer.write(buffer);
}
- buffer.flip();
- writer.write(buffer);
}
- if (unsequenceFiles != null && !unsequenceFiles.isEmpty()) {
- ByteBuffer buffer =
- ByteBuffer.allocate(
- getFileTimeIndexSerializedSize() *
unsequenceFiles.size());
- for (TsFileResource tsFileResource : unsequenceFiles) {
-
tsFileResource.serializeFileTimeIndexToByteBuffer(buffer);
+ for (TsFileResourceList unsequenceList :
unsequenceFiles.values()) {
+ if (unsequenceList != null && !unsequenceList.isEmpty()) {
+ ByteBuffer buffer =
+ ByteBuffer.allocate(
+ getFileTimeIndexSerializedSize() *
unsequenceList.size());
+ for (TsFileResource tsFileResource : unsequenceList) {
+
tsFileResource.serializeFileTimeIndexToByteBuffer(buffer);
+ }
+ buffer.flip();
+ writer.write(buffer);
}
- buffer.flip();
- writer.write(buffer);
}
} catch (IOException e) {
LOGGER.warn("Meet error when compact FileTimeIndexCache:
{}", e.getMessage());
@@ -166,51 +164,41 @@ public class FileTimeIndexCacheRecorder {
}
}
- private FileTimeIndexCacheWriter getWriter(
- int dataRegionId, long partitionId, File dataRegionSysDir) {
- return writerMap
- .computeIfAbsent(dataRegionId, k -> new ConcurrentHashMap<>())
- .computeIfAbsent(
- partitionId,
- k -> {
- File partitionDir =
- SystemFileFactory.INSTANCE.getFile(dataRegionSysDir,
String.valueOf(partitionId));
- File logFile = SystemFileFactory.INSTANCE.getFile(partitionDir,
FILE_NAME);
- try {
- if (!partitionDir.exists() && !partitionDir.mkdirs()) {
- LOGGER.debug(
- "Partition directory has existed,filePath:{}",
- partitionDir.getAbsolutePath());
- }
- if (!logFile.createNewFile()) {
- LOGGER.debug(
- "Partition log file has existed,filePath:{}",
logFile.getAbsolutePath());
- }
- return new FileTimeIndexCacheWriter(logFile, true);
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- });
+ private FileTimeIndexCacheWriter getWriter(int dataRegionId, File
dataRegionSysDir) {
+ return writerMap.computeIfAbsent(
+ dataRegionId,
+ k -> {
+ File logFile = SystemFileFactory.INSTANCE.getFile(dataRegionSysDir,
FILE_NAME);
+ try {
+ if (!dataRegionSysDir.exists() && !dataRegionSysDir.mkdirs()) {
+ LOGGER.debug(
+ "DataRegionSysDir has existed,filePath:{}",
dataRegionSysDir.getAbsolutePath());
+ }
+ if (!logFile.createNewFile()) {
+ LOGGER.debug("FileTimeIndex file has existed,filePath:{}",
logFile.getAbsolutePath());
+ }
+ return new FileTimeIndexCacheWriter(logFile, true);
+ } catch (IOException e) {
+ LOGGER.error(
+ "FileTimeIndex log file create filed,filePath:{}",
logFile.getAbsolutePath(), e);
+ throw new RuntimeException(e);
+ }
+ });
}
public void close() throws IOException {
- for (Map<Long, FileTimeIndexCacheWriter> partitionWriterMap :
writerMap.values()) {
- for (FileTimeIndexCacheWriter writer : partitionWriterMap.values()) {
- writer.close();
- }
+ for (FileTimeIndexCacheWriter writer : writerMap.values()) {
+ writer.close();
}
}
public void removeFileTimeIndexCache(int dataRegionId) {
- Map<Long, FileTimeIndexCacheWriter> partitionWriterMap =
writerMap.get(dataRegionId);
- if (partitionWriterMap != null) {
- for (FileTimeIndexCacheWriter writer : partitionWriterMap.values()) {
- try {
- writer.close();
- deleteDirectoryAndEmptyParent(writer.getLogFile());
- } catch (IOException e) {
- LOGGER.warn("Meet error when close FileTimeIndexCache: {}",
e.getMessage());
- }
+ for (FileTimeIndexCacheWriter writer : writerMap.values()) {
+ try {
+ writer.close();
+ deleteDirectoryAndEmptyParent(writer.getLogFile());
+ } catch (IOException e) {
+ LOGGER.warn("Meet error when close FileTimeIndexCache: {}",
e.getMessage());
}
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/utils/fileTimeIndexCache/FileTimeIndexCacheReader.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/utils/fileTimeIndexCache/FileTimeIndexCacheReader.java
index 35682bd8020..d07210736e6 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/utils/fileTimeIndexCache/FileTimeIndexCacheReader.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/utils/fileTimeIndexCache/FileTimeIndexCacheReader.java
@@ -32,7 +32,6 @@ import java.io.IOException;
import java.nio.channels.FileChannel;
import java.nio.file.Files;
import java.nio.file.StandardOpenOption;
-import java.util.HashMap;
import java.util.Map;
import static
org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource.getFileTimeIndexSerializedSize;
@@ -44,21 +43,19 @@ public class FileTimeIndexCacheReader {
private final File logFile;
private final long fileLength;
private final int dataRegionId;
- private final long partitionId;
- public FileTimeIndexCacheReader(File logFile, String dataRegionId, long
partitionId) {
+ public FileTimeIndexCacheReader(File logFile, String dataRegionId) {
this.logFile = logFile;
this.fileLength = logFile.length();
this.dataRegionId = Integer.parseInt(dataRegionId);
- this.partitionId = partitionId;
}
- public Map<TsFileID, FileTimeIndex> read() throws IOException {
- Map<TsFileID, FileTimeIndex> fileTimeIndexMap = new HashMap<>();
+ public void read(Map<TsFileID, FileTimeIndex> fileTimeIndexMap) throws
IOException {
long readLength = 0L;
try (DataInputStream logStream =
new DataInputStream(new
BufferedInputStream(Files.newInputStream(logFile.toPath())))) {
while (readLength < fileLength) {
+ long partitionId = logStream.readLong();
long timestamp = logStream.readLong();
long fileVersion = logStream.readLong();
long compactionVersion = logStream.readLong();
@@ -79,6 +76,5 @@ public class FileTimeIndexCacheReader {
channel.truncate(readLength);
}
}
- return fileTimeIndexMap;
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/utils/fileTimeIndexCache/FileTimeIndexCacheWriter.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/utils/fileTimeIndexCache/FileTimeIndexCacheWriter.java
index dce4f8b1103..3d88575ba68 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/utils/fileTimeIndexCache/FileTimeIndexCacheWriter.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/utils/fileTimeIndexCache/FileTimeIndexCacheWriter.java
@@ -53,6 +53,10 @@ public class FileTimeIndexCacheWriter implements ILogWriter {
public void write(ByteBuffer logBuffer) throws IOException {
try {
+ if (!this.logFile.exists()) {
+ // For UT env, logFile may not be created
+ return;
+ }
channel.write(logBuffer);
if (this.forceEachWrite) {
channel.force(true);