This is an automated email from the ASF dual-hosted git repository.

haonan pushed a commit to branch regionLevelFileTimeIndexCache
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit a149cee15d2e0bcab78c9cc2cba0949bf6a2904f
Author: HTHou <[email protected]>
AuthorDate: Fri Aug 30 11:53:47 2024 +0800

    Check FileTimeIndexCache to region level
---
 .../db/storageengine/dataregion/DataRegion.java    |  73 +++++++------
 .../dataregion/tsfile/TsFileManager.java           |   9 +-
 .../dataregion/tsfile/TsFileResource.java          |  12 ++-
 .../timeindex/FileTimeIndexCacheRecorder.java      | 113 +++++++++------------
 .../FileTimeIndexCacheReader.java                  |  10 +-
 5 files changed, 99 insertions(+), 118 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
index 3953fd8d1a6..09b30bf1bb6 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
@@ -546,12 +546,24 @@ public class DataRegion implements IDataRegionForQuery {
                   latestPartitionId,
                   ((TreeMap<Long, List<TsFileResource>>) 
partitionTmpUnseqTsFiles).lastKey());
         }
+        File logFile = SystemFileFactory.INSTANCE.getFile(dataRegionSysDir, 
"FileTimeIndexCache_0");
+        Map<TsFileID, FileTimeIndex> fileTimeIndexMap = new HashMap<>();
+        if (logFile.exists()) {
+          try {
+            FileTimeIndexCacheReader logReader =
+                new FileTimeIndexCacheReader(logFile, dataRegionId);
+            logReader.read(fileTimeIndexMap);
+          } catch (Exception e) {
+            throw new RuntimeException(e);
+          }
+        }
         for (Entry<Long, List<TsFileResource>> partitionFiles : 
partitionTmpSeqTsFiles.entrySet()) {
           Callable<Void> asyncRecoverTask =
               recoverFilesInPartition(
                   partitionFiles.getKey(),
                   dataRegionRecoveryContext,
                   partitionFiles.getValue(),
+                  fileTimeIndexMap,
                   true);
           if (asyncRecoverTask != null) {
             asyncTsFileResourceRecoverTaskList.add(asyncRecoverTask);
@@ -564,6 +576,7 @@ public class DataRegion implements IDataRegionForQuery {
                   partitionFiles.getKey(),
                   dataRegionRecoveryContext,
                   partitionFiles.getValue(),
+                  fileTimeIndexMap,
                   false);
           if (asyncRecoverTask != null) {
             asyncTsFileResourceRecoverTaskList.add(asyncRecoverTask);
@@ -870,45 +883,29 @@ public class DataRegion implements IDataRegionForQuery {
       long partitionId,
       DataRegionRecoveryContext context,
       List<TsFileResource> resourceList,
+      Map<TsFileID, FileTimeIndex> fileTimeIndexMap,
       boolean isSeq) {
-
-    File partitionSysDir =
-        SystemFileFactory.INSTANCE.getFile(dataRegionSysDir, 
String.valueOf(partitionId));
-    File logFile = SystemFileFactory.INSTANCE.getFile(partitionSysDir, 
"FileTimeIndexCache_0");
-    if (logFile.exists()) {
-      Map<TsFileID, FileTimeIndex> fileTimeIndexMap;
-      try {
-        FileTimeIndexCacheReader logReader =
-            new FileTimeIndexCacheReader(logFile, dataRegionId, partitionId);
-        fileTimeIndexMap = logReader.read();
-      } catch (Exception e) {
-        throw new RuntimeException(e);
-      }
-      List<TsFileResource> resourceListForAsyncRecover = new ArrayList<>();
-      List<TsFileResource> resourceListForSyncRecover = new ArrayList<>();
-      Callable<Void> asyncRecoverTask = null;
-      for (TsFileResource tsFileResource : resourceList) {
-        if (fileTimeIndexMap.containsKey(tsFileResource.getTsFileID())) {
-          
tsFileResource.setTimeIndex(fileTimeIndexMap.get(tsFileResource.getTsFileID()));
-          tsFileResource.setStatus(TsFileResourceStatus.NORMAL);
-          tsFileManager.add(tsFileResource, isSeq);
-          resourceListForAsyncRecover.add(tsFileResource);
-        } else {
-          resourceListForSyncRecover.add(tsFileResource);
-        }
-      }
-      if (!resourceListForAsyncRecover.isEmpty()) {
-        asyncRecoverTask =
-            asyncRecoverFilesInPartition(partitionId, context, 
resourceListForAsyncRecover, isSeq);
-      }
-      if (!resourceListForSyncRecover.isEmpty()) {
-        syncRecoverFilesInPartition(partitionId, context, 
resourceListForSyncRecover, isSeq);
+    List<TsFileResource> resourceListForAsyncRecover = new ArrayList<>();
+    List<TsFileResource> resourceListForSyncRecover = new ArrayList<>();
+    Callable<Void> asyncRecoverTask = null;
+    for (TsFileResource tsFileResource : resourceList) {
+      if (fileTimeIndexMap.containsKey(tsFileResource.getTsFileID())) {
+        
tsFileResource.setTimeIndex(fileTimeIndexMap.get(tsFileResource.getTsFileID()));
+        tsFileResource.setStatus(TsFileResourceStatus.NORMAL);
+        tsFileManager.add(tsFileResource, isSeq);
+        resourceListForAsyncRecover.add(tsFileResource);
+      } else {
+        resourceListForSyncRecover.add(tsFileResource);
       }
-      return asyncRecoverTask;
-    } else {
-      syncRecoverFilesInPartition(partitionId, context, resourceList, isSeq);
-      return null;
     }
+    if (!resourceListForAsyncRecover.isEmpty()) {
+      asyncRecoverTask =
+          asyncRecoverFilesInPartition(partitionId, context, 
resourceListForAsyncRecover, isSeq);
+    }
+    if (!resourceListForSyncRecover.isEmpty()) {
+      syncRecoverFilesInPartition(partitionId, context, 
resourceListForSyncRecover, isSeq);
+    }
+    return asyncRecoverTask;
   }
 
   private Callable<Void> asyncRecoverFilesInPartition(
@@ -3919,9 +3916,7 @@ public class DataRegion implements IDataRegionForQuery {
   }
 
   public void compactFileTimeIndexCache() {
-    for (long timePartition : partitionMaxFileVersions.keySet()) {
-      tsFileManager.compactFileTimeIndexCache(timePartition);
-    }
+    tsFileManager.compactFileTimeIndexCache();
   }
 
   @TestOnly
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileManager.java
index 06409cf3cf8..50aee584f9c 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileManager.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileManager.java
@@ -392,16 +392,17 @@ public class TsFileManager {
         && unsequenceFiles.higherKey(timePartitionId) == null);
   }
 
-  public void compactFileTimeIndexCache(long timePartition) {
+  public void compactFileTimeIndexCache() {
+    int currentResourceSize = size(true) + size(false);
     readLock();
     try {
       FileTimeIndexCacheRecorder.getInstance()
           .compactFileTimeIndexIfNeeded(
               storageGroupName,
               Integer.parseInt(dataRegionId),
-              timePartition,
-              sequenceFiles.get(timePartition),
-              unsequenceFiles.get(timePartition));
+              currentResourceSize,
+              sequenceFiles,
+              unsequenceFiles);
     } finally {
       readUnlock();
     }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileResource.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileResource.java
index 97dcada9984..0ee97fd2a63 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileResource.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileResource.java
@@ -296,12 +296,18 @@ public class TsFileResource {
   }
 
   public static int getFileTimeIndexSerializedSize() {
-    // 5 * 8 Byte means 5 long numbers of tsFileID.timestamp, 
tsFileID.fileVersion
-    // tsFileID.compactionVersion, timeIndex.getMinStartTime(), 
timeIndex.getMaxStartTime()
-    return 5 * Long.BYTES;
+    // 6 * 8 Byte means 6 long numbers of
+    // tsFileID.timePartitionId,
+    // tsFileID.timestamp,
+    // tsFileID.fileVersion,
+    // tsFileID.compactionVersion,
+    // timeIndex.getMinStartTime(),
+    // timeIndex.getMaxStartTime()
+    return 6 * Long.BYTES;
   }
 
   public void serializeFileTimeIndexToByteBuffer(ByteBuffer buffer) {
+    buffer.putLong(tsFileID.timePartitionId);
     buffer.putLong(tsFileID.timestamp);
     buffer.putLong(tsFileID.fileVersion);
     buffer.putLong(tsFileID.compactionVersion);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/timeindex/FileTimeIndexCacheRecorder.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/timeindex/FileTimeIndexCacheRecorder.java
index b0c805bba6b..d9f182e8a29 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/timeindex/FileTimeIndexCacheRecorder.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/timeindex/FileTimeIndexCacheRecorder.java
@@ -57,8 +57,7 @@ public class FileTimeIndexCacheRecorder {
 
   private final BlockingQueue<Runnable> taskQueue = new 
LinkedBlockingQueue<>();
 
-  private final Map<Integer, Map<Long, FileTimeIndexCacheWriter>> writerMap =
-      new ConcurrentHashMap<>();
+  private final Map<Integer, FileTimeIndexCacheWriter> writerMap = new 
ConcurrentHashMap<>();
 
   private FileTimeIndexCacheRecorder() {
     recordFileIndexThread =
@@ -86,11 +85,10 @@ public class FileTimeIndexCacheRecorder {
       TsFileResource firstResource = tsFileResources[0];
       TsFileID tsFileID = firstResource.getTsFileID();
       int dataRegionId = tsFileID.regionId;
-      long partitionId = tsFileID.timePartitionId;
       File dataRegionSysDir =
           StorageEngine.getDataRegionSystemDir(
               firstResource.getDatabaseName(), 
firstResource.getDataRegionId());
-      FileTimeIndexCacheWriter writer = getWriter(dataRegionId, partitionId, 
dataRegionSysDir);
+      FileTimeIndexCacheWriter writer = getWriter(dataRegionId, 
dataRegionSysDir);
       boolean result =
           taskQueue.offer(
               () -> {
@@ -116,18 +114,14 @@ public class FileTimeIndexCacheRecorder {
   public void compactFileTimeIndexIfNeeded(
       String dataBaseName,
       int dataRegionId,
-      long partitionId,
-      TsFileResourceList sequenceFiles,
-      TsFileResourceList unsequenceFiles) {
+      int currentResourceCount,
+      Map<Long, TsFileResourceList> sequenceFiles,
+      Map<Long, TsFileResourceList> unsequenceFiles) {
     FileTimeIndexCacheWriter writer =
         getWriter(
             dataRegionId,
-            partitionId,
             StorageEngine.getDataRegionSystemDir(dataBaseName, 
String.valueOf(dataRegionId)));
 
-    int currentResourceCount =
-        (sequenceFiles == null ? 0 : sequenceFiles.size())
-            + (unsequenceFiles == null ? 0 : unsequenceFiles.size());
     if (writer.getLogFile().length()
         > currentResourceCount * getFileTimeIndexSerializedSize() * 100L) {
 
@@ -136,25 +130,29 @@ public class FileTimeIndexCacheRecorder {
               () -> {
                 try {
                   writer.clearFile();
-                  if (sequenceFiles != null && !sequenceFiles.isEmpty()) {
-                    ByteBuffer buffer =
-                        ByteBuffer.allocate(
-                            getFileTimeIndexSerializedSize() * 
sequenceFiles.size());
-                    for (TsFileResource tsFileResource : sequenceFiles) {
-                      
tsFileResource.serializeFileTimeIndexToByteBuffer(buffer);
+                  for (TsFileResourceList sequenceList : 
sequenceFiles.values()) {
+                    if (sequenceList != null && !sequenceList.isEmpty()) {
+                      ByteBuffer buffer =
+                          ByteBuffer.allocate(
+                              getFileTimeIndexSerializedSize() * 
sequenceList.size());
+                      for (TsFileResource tsFileResource : sequenceList) {
+                        
tsFileResource.serializeFileTimeIndexToByteBuffer(buffer);
+                      }
+                      buffer.flip();
+                      writer.write(buffer);
                     }
-                    buffer.flip();
-                    writer.write(buffer);
                   }
-                  if (unsequenceFiles != null && !unsequenceFiles.isEmpty()) {
-                    ByteBuffer buffer =
-                        ByteBuffer.allocate(
-                            getFileTimeIndexSerializedSize() * 
unsequenceFiles.size());
-                    for (TsFileResource tsFileResource : unsequenceFiles) {
-                      
tsFileResource.serializeFileTimeIndexToByteBuffer(buffer);
+                  for (TsFileResourceList unsequenceList : 
unsequenceFiles.values()) {
+                    if (unsequenceList != null && !unsequenceList.isEmpty()) {
+                      ByteBuffer buffer =
+                          ByteBuffer.allocate(
+                              getFileTimeIndexSerializedSize() * 
unsequenceList.size());
+                      for (TsFileResource tsFileResource : unsequenceList) {
+                        
tsFileResource.serializeFileTimeIndexToByteBuffer(buffer);
+                      }
+                      buffer.flip();
+                      writer.write(buffer);
                     }
-                    buffer.flip();
-                    writer.write(buffer);
                   }
                 } catch (IOException e) {
                   LOGGER.warn("Meet error when compact FileTimeIndexCache: 
{}", e.getMessage());
@@ -166,51 +164,36 @@ public class FileTimeIndexCacheRecorder {
     }
   }
 
-  private FileTimeIndexCacheWriter getWriter(
-      int dataRegionId, long partitionId, File dataRegionSysDir) {
-    return writerMap
-        .computeIfAbsent(dataRegionId, k -> new ConcurrentHashMap<>())
-        .computeIfAbsent(
-            partitionId,
-            k -> {
-              File partitionDir =
-                  SystemFileFactory.INSTANCE.getFile(dataRegionSysDir, 
String.valueOf(partitionId));
-              File logFile = SystemFileFactory.INSTANCE.getFile(partitionDir, 
FILE_NAME);
-              try {
-                if (!partitionDir.exists() && !partitionDir.mkdirs()) {
-                  LOGGER.debug(
-                      "Partition directory has existed,filePath:{}",
-                      partitionDir.getAbsolutePath());
-                }
-                if (!logFile.createNewFile()) {
-                  LOGGER.debug(
-                      "Partition log file has existed,filePath:{}", 
logFile.getAbsolutePath());
-                }
-                return new FileTimeIndexCacheWriter(logFile, true);
-              } catch (IOException e) {
-                throw new RuntimeException(e);
-              }
-            });
+  private FileTimeIndexCacheWriter getWriter(int dataRegionId, File 
dataRegionSysDir) {
+    return writerMap.computeIfAbsent(
+        dataRegionId,
+        k -> {
+          File logFile = SystemFileFactory.INSTANCE.getFile(dataRegionSysDir, 
FILE_NAME);
+          try {
+            if (!logFile.createNewFile()) {
+              LOGGER.debug(
+                  "FileTimeIndex log file has existed,filePath:{}", 
logFile.getAbsolutePath());
+            }
+            return new FileTimeIndexCacheWriter(logFile, true);
+          } catch (IOException e) {
+            throw new RuntimeException(e);
+          }
+        });
   }
 
   public void close() throws IOException {
-    for (Map<Long, FileTimeIndexCacheWriter> partitionWriterMap : 
writerMap.values()) {
-      for (FileTimeIndexCacheWriter writer : partitionWriterMap.values()) {
-        writer.close();
-      }
+    for (FileTimeIndexCacheWriter writer : writerMap.values()) {
+      writer.close();
     }
   }
 
   public void removeFileTimeIndexCache(int dataRegionId) {
-    Map<Long, FileTimeIndexCacheWriter> partitionWriterMap = 
writerMap.get(dataRegionId);
-    if (partitionWriterMap != null) {
-      for (FileTimeIndexCacheWriter writer : partitionWriterMap.values()) {
-        try {
-          writer.close();
-          deleteDirectoryAndEmptyParent(writer.getLogFile());
-        } catch (IOException e) {
-          LOGGER.warn("Meet error when close FileTimeIndexCache: {}", 
e.getMessage());
-        }
+    for (FileTimeIndexCacheWriter writer : writerMap.values()) {
+      try {
+        writer.close();
+        deleteDirectoryAndEmptyParent(writer.getLogFile());
+      } catch (IOException e) {
+        LOGGER.warn("Meet error when close FileTimeIndexCache: {}", 
e.getMessage());
       }
     }
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/utils/fileTimeIndexCache/FileTimeIndexCacheReader.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/utils/fileTimeIndexCache/FileTimeIndexCacheReader.java
index 35682bd8020..d07210736e6 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/utils/fileTimeIndexCache/FileTimeIndexCacheReader.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/utils/fileTimeIndexCache/FileTimeIndexCacheReader.java
@@ -32,7 +32,6 @@ import java.io.IOException;
 import java.nio.channels.FileChannel;
 import java.nio.file.Files;
 import java.nio.file.StandardOpenOption;
-import java.util.HashMap;
 import java.util.Map;
 
 import static 
org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource.getFileTimeIndexSerializedSize;
@@ -44,21 +43,19 @@ public class FileTimeIndexCacheReader {
   private final File logFile;
   private final long fileLength;
   private final int dataRegionId;
-  private final long partitionId;
 
-  public FileTimeIndexCacheReader(File logFile, String dataRegionId, long 
partitionId) {
+  public FileTimeIndexCacheReader(File logFile, String dataRegionId) {
     this.logFile = logFile;
     this.fileLength = logFile.length();
     this.dataRegionId = Integer.parseInt(dataRegionId);
-    this.partitionId = partitionId;
   }
 
-  public Map<TsFileID, FileTimeIndex> read() throws IOException {
-    Map<TsFileID, FileTimeIndex> fileTimeIndexMap = new HashMap<>();
+  public void read(Map<TsFileID, FileTimeIndex> fileTimeIndexMap) throws 
IOException {
     long readLength = 0L;
     try (DataInputStream logStream =
         new DataInputStream(new 
BufferedInputStream(Files.newInputStream(logFile.toPath())))) {
       while (readLength < fileLength) {
+        long partitionId = logStream.readLong();
         long timestamp = logStream.readLong();
         long fileVersion = logStream.readLong();
         long compactionVersion = logStream.readLong();
@@ -79,6 +76,5 @@ public class FileTimeIndexCacheReader {
         channel.truncate(readLength);
       }
     }
-    return fileTimeIndexMap;
   }
 }

Reply via email to