This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a commit to branch rc/1.3.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/rc/1.3.3 by this push:
     new a492c687b4f Change FileTimeIndexCache to region level (#13353) (#13409)
a492c687b4f is described below

commit a492c687b4f7b1864fbbe244dbecece3b0a065d5
Author: Haonan <[email protected]>
AuthorDate: Thu Sep 5 12:19:20 2024 +0800

    Change FileTimeIndexCache to region level (#13353) (#13409)
    
    * Check FileTimeIndexCache to region level
    
    * add log
    
    * fix ut
    
    * fix it error
    
    * fix error log
---
 .../iotdb/db/storageengine/StorageEngine.java      |   4 +
 .../db/storageengine/dataregion/DataRegion.java    |  73 ++++++-------
 .../dataregion/tsfile/TsFileManager.java           |   9 +-
 .../dataregion/tsfile/TsFileResource.java          |  12 ++-
 .../timeindex/FileTimeIndexCacheRecorder.java      | 118 +++++++++------------
 .../FileTimeIndexCacheReader.java                  |  10 +-
 .../FileTimeIndexCacheWriter.java                  |   4 +
 7 files changed, 112 insertions(+), 118 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/StorageEngine.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/StorageEngine.java
index 224a20dc196..f9bcb43be60 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/StorageEngine.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/StorageEngine.java
@@ -355,6 +355,7 @@ public class StorageEngine implements IService {
 
   private void asyncRecoverTsFileResource() {
     List<Future<Void>> futures = new LinkedList<>();
+    long startRecoverTime = System.currentTimeMillis();
     for (DataRegion dataRegion : dataRegionMap.values()) {
       if (dataRegion != null) {
         List<Callable<Void>> asyncTsFileResourceRecoverTasks =
@@ -378,6 +379,9 @@ public class StorageEngine implements IService {
               checkResults(futures, "async recover tsfile resource meets 
error.");
               recoverRepairData();
               isReadyForNonReadWriteFunctions.set(true);
+              LOGGER.info(
+                  "TsFile Resource recover cost: {}s.",
+                  (System.currentTimeMillis() - startRecoverTime) / 1000);
             },
             ThreadName.STORAGE_ENGINE_RECOVER_TRIGGER.getName());
     recoverEndTrigger.start();
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
index 232e1dd09e9..6a8a339ccc3 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
@@ -538,12 +538,24 @@ public class DataRegion implements IDataRegionForQuery {
                   latestPartitionId,
                   ((TreeMap<Long, List<TsFileResource>>) 
partitionTmpUnseqTsFiles).lastKey());
         }
+        File logFile = SystemFileFactory.INSTANCE.getFile(dataRegionSysDir, 
"FileTimeIndexCache_0");
+        Map<TsFileID, FileTimeIndex> fileTimeIndexMap = new HashMap<>();
+        if (logFile.exists()) {
+          try {
+            FileTimeIndexCacheReader logReader =
+                new FileTimeIndexCacheReader(logFile, dataRegionId);
+            logReader.read(fileTimeIndexMap);
+          } catch (Exception e) {
+            throw new RuntimeException(e);
+          }
+        }
         for (Entry<Long, List<TsFileResource>> partitionFiles : 
partitionTmpSeqTsFiles.entrySet()) {
           Callable<Void> asyncRecoverTask =
               recoverFilesInPartition(
                   partitionFiles.getKey(),
                   dataRegionRecoveryContext,
                   partitionFiles.getValue(),
+                  fileTimeIndexMap,
                   true);
           if (asyncRecoverTask != null) {
             asyncTsFileResourceRecoverTaskList.add(asyncRecoverTask);
@@ -556,6 +568,7 @@ public class DataRegion implements IDataRegionForQuery {
                   partitionFiles.getKey(),
                   dataRegionRecoveryContext,
                   partitionFiles.getValue(),
+                  fileTimeIndexMap,
                   false);
           if (asyncRecoverTask != null) {
             asyncTsFileResourceRecoverTaskList.add(asyncRecoverTask);
@@ -862,45 +875,29 @@ public class DataRegion implements IDataRegionForQuery {
       long partitionId,
       DataRegionRecoveryContext context,
       List<TsFileResource> resourceList,
+      Map<TsFileID, FileTimeIndex> fileTimeIndexMap,
       boolean isSeq) {
-
-    File partitionSysDir =
-        SystemFileFactory.INSTANCE.getFile(dataRegionSysDir, 
String.valueOf(partitionId));
-    File logFile = SystemFileFactory.INSTANCE.getFile(partitionSysDir, 
"FileTimeIndexCache_0");
-    if (logFile.exists()) {
-      Map<TsFileID, FileTimeIndex> fileTimeIndexMap;
-      try {
-        FileTimeIndexCacheReader logReader =
-            new FileTimeIndexCacheReader(logFile, dataRegionId, partitionId);
-        fileTimeIndexMap = logReader.read();
-      } catch (Exception e) {
-        throw new RuntimeException(e);
-      }
-      List<TsFileResource> resourceListForAsyncRecover = new ArrayList<>();
-      List<TsFileResource> resourceListForSyncRecover = new ArrayList<>();
-      Callable<Void> asyncRecoverTask = null;
-      for (TsFileResource tsFileResource : resourceList) {
-        if (fileTimeIndexMap.containsKey(tsFileResource.getTsFileID())) {
-          
tsFileResource.setTimeIndex(fileTimeIndexMap.get(tsFileResource.getTsFileID()));
-          tsFileResource.setStatus(TsFileResourceStatus.NORMAL);
-          tsFileManager.add(tsFileResource, isSeq);
-          resourceListForAsyncRecover.add(tsFileResource);
-        } else {
-          resourceListForSyncRecover.add(tsFileResource);
-        }
-      }
-      if (!resourceListForAsyncRecover.isEmpty()) {
-        asyncRecoverTask =
-            asyncRecoverFilesInPartition(partitionId, context, 
resourceListForAsyncRecover, isSeq);
-      }
-      if (!resourceListForSyncRecover.isEmpty()) {
-        syncRecoverFilesInPartition(partitionId, context, 
resourceListForSyncRecover, isSeq);
+    List<TsFileResource> resourceListForAsyncRecover = new ArrayList<>();
+    List<TsFileResource> resourceListForSyncRecover = new ArrayList<>();
+    Callable<Void> asyncRecoverTask = null;
+    for (TsFileResource tsFileResource : resourceList) {
+      if (fileTimeIndexMap.containsKey(tsFileResource.getTsFileID())) {
+        
tsFileResource.setTimeIndex(fileTimeIndexMap.get(tsFileResource.getTsFileID()));
+        tsFileResource.setStatus(TsFileResourceStatus.NORMAL);
+        tsFileManager.add(tsFileResource, isSeq);
+        resourceListForAsyncRecover.add(tsFileResource);
+      } else {
+        resourceListForSyncRecover.add(tsFileResource);
       }
-      return asyncRecoverTask;
-    } else {
-      syncRecoverFilesInPartition(partitionId, context, resourceList, isSeq);
-      return null;
     }
+    if (!resourceListForAsyncRecover.isEmpty()) {
+      asyncRecoverTask =
+          asyncRecoverFilesInPartition(partitionId, context, 
resourceListForAsyncRecover, isSeq);
+    }
+    if (!resourceListForSyncRecover.isEmpty()) {
+      syncRecoverFilesInPartition(partitionId, context, 
resourceListForSyncRecover, isSeq);
+    }
+    return asyncRecoverTask;
   }
 
   private Callable<Void> asyncRecoverFilesInPartition(
@@ -3780,9 +3777,7 @@ public class DataRegion implements IDataRegionForQuery {
   }
 
   public void compactFileTimeIndexCache() {
-    for (long timePartition : partitionMaxFileVersions.keySet()) {
-      tsFileManager.compactFileTimeIndexCache(timePartition);
-    }
+    tsFileManager.compactFileTimeIndexCache();
   }
 
   @TestOnly
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileManager.java
index 06409cf3cf8..50aee584f9c 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileManager.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileManager.java
@@ -392,16 +392,17 @@ public class TsFileManager {
         && unsequenceFiles.higherKey(timePartitionId) == null);
   }
 
-  public void compactFileTimeIndexCache(long timePartition) {
+  public void compactFileTimeIndexCache() {
+    int currentResourceSize = size(true) + size(false);
     readLock();
     try {
       FileTimeIndexCacheRecorder.getInstance()
           .compactFileTimeIndexIfNeeded(
               storageGroupName,
               Integer.parseInt(dataRegionId),
-              timePartition,
-              sequenceFiles.get(timePartition),
-              unsequenceFiles.get(timePartition));
+              currentResourceSize,
+              sequenceFiles,
+              unsequenceFiles);
     } finally {
       readUnlock();
     }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileResource.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileResource.java
index bc24dec1049..b7973cc03df 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileResource.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileResource.java
@@ -294,12 +294,18 @@ public class TsFileResource {
   }
 
   public static int getFileTimeIndexSerializedSize() {
-    // 5 * 8 Byte means 5 long numbers of tsFileID.timestamp, 
tsFileID.fileVersion
-    // tsFileID.compactionVersion, timeIndex.getMinStartTime(), 
timeIndex.getMaxStartTime()
-    return 5 * Long.BYTES;
+    // 6 * 8 Byte means 6 long numbers of
+    // tsFileID.timePartitionId,
+    // tsFileID.timestamp,
+    // tsFileID.fileVersion,
+    // tsFileID.compactionVersion,
+    // timeIndex.getMinStartTime(),
+    // timeIndex.getMaxStartTime()
+    return 6 * Long.BYTES;
   }
 
   public void serializeFileTimeIndexToByteBuffer(ByteBuffer buffer) {
+    buffer.putLong(tsFileID.timePartitionId);
     buffer.putLong(tsFileID.timestamp);
     buffer.putLong(tsFileID.fileVersion);
     buffer.putLong(tsFileID.compactionVersion);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/timeindex/FileTimeIndexCacheRecorder.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/timeindex/FileTimeIndexCacheRecorder.java
index b0c805bba6b..d6eb597922e 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/timeindex/FileTimeIndexCacheRecorder.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/timeindex/FileTimeIndexCacheRecorder.java
@@ -57,8 +57,7 @@ public class FileTimeIndexCacheRecorder {
 
   private final BlockingQueue<Runnable> taskQueue = new 
LinkedBlockingQueue<>();
 
-  private final Map<Integer, Map<Long, FileTimeIndexCacheWriter>> writerMap =
-      new ConcurrentHashMap<>();
+  private final Map<Integer, FileTimeIndexCacheWriter> writerMap = new 
ConcurrentHashMap<>();
 
   private FileTimeIndexCacheRecorder() {
     recordFileIndexThread =
@@ -86,11 +85,10 @@ public class FileTimeIndexCacheRecorder {
       TsFileResource firstResource = tsFileResources[0];
       TsFileID tsFileID = firstResource.getTsFileID();
       int dataRegionId = tsFileID.regionId;
-      long partitionId = tsFileID.timePartitionId;
       File dataRegionSysDir =
           StorageEngine.getDataRegionSystemDir(
               firstResource.getDatabaseName(), 
firstResource.getDataRegionId());
-      FileTimeIndexCacheWriter writer = getWriter(dataRegionId, partitionId, 
dataRegionSysDir);
+      FileTimeIndexCacheWriter writer = getWriter(dataRegionId, 
dataRegionSysDir);
       boolean result =
           taskQueue.offer(
               () -> {
@@ -116,18 +114,14 @@ public class FileTimeIndexCacheRecorder {
   public void compactFileTimeIndexIfNeeded(
       String dataBaseName,
       int dataRegionId,
-      long partitionId,
-      TsFileResourceList sequenceFiles,
-      TsFileResourceList unsequenceFiles) {
+      int currentResourceCount,
+      Map<Long, TsFileResourceList> sequenceFiles,
+      Map<Long, TsFileResourceList> unsequenceFiles) {
     FileTimeIndexCacheWriter writer =
         getWriter(
             dataRegionId,
-            partitionId,
             StorageEngine.getDataRegionSystemDir(dataBaseName, 
String.valueOf(dataRegionId)));
 
-    int currentResourceCount =
-        (sequenceFiles == null ? 0 : sequenceFiles.size())
-            + (unsequenceFiles == null ? 0 : unsequenceFiles.size());
     if (writer.getLogFile().length()
         > currentResourceCount * getFileTimeIndexSerializedSize() * 100L) {
 
@@ -136,25 +130,29 @@ public class FileTimeIndexCacheRecorder {
               () -> {
                 try {
                   writer.clearFile();
-                  if (sequenceFiles != null && !sequenceFiles.isEmpty()) {
-                    ByteBuffer buffer =
-                        ByteBuffer.allocate(
-                            getFileTimeIndexSerializedSize() * 
sequenceFiles.size());
-                    for (TsFileResource tsFileResource : sequenceFiles) {
-                      
tsFileResource.serializeFileTimeIndexToByteBuffer(buffer);
+                  for (TsFileResourceList sequenceList : 
sequenceFiles.values()) {
+                    if (sequenceList != null && !sequenceList.isEmpty()) {
+                      ByteBuffer buffer =
+                          ByteBuffer.allocate(
+                              getFileTimeIndexSerializedSize() * 
sequenceList.size());
+                      for (TsFileResource tsFileResource : sequenceList) {
+                        
tsFileResource.serializeFileTimeIndexToByteBuffer(buffer);
+                      }
+                      buffer.flip();
+                      writer.write(buffer);
                     }
-                    buffer.flip();
-                    writer.write(buffer);
                   }
-                  if (unsequenceFiles != null && !unsequenceFiles.isEmpty()) {
-                    ByteBuffer buffer =
-                        ByteBuffer.allocate(
-                            getFileTimeIndexSerializedSize() * 
unsequenceFiles.size());
-                    for (TsFileResource tsFileResource : unsequenceFiles) {
-                      
tsFileResource.serializeFileTimeIndexToByteBuffer(buffer);
+                  for (TsFileResourceList unsequenceList : 
unsequenceFiles.values()) {
+                    if (unsequenceList != null && !unsequenceList.isEmpty()) {
+                      ByteBuffer buffer =
+                          ByteBuffer.allocate(
+                              getFileTimeIndexSerializedSize() * 
unsequenceList.size());
+                      for (TsFileResource tsFileResource : unsequenceList) {
+                        
tsFileResource.serializeFileTimeIndexToByteBuffer(buffer);
+                      }
+                      buffer.flip();
+                      writer.write(buffer);
                     }
-                    buffer.flip();
-                    writer.write(buffer);
                   }
                 } catch (IOException e) {
                   LOGGER.warn("Meet error when compact FileTimeIndexCache: 
{}", e.getMessage());
@@ -166,51 +164,41 @@ public class FileTimeIndexCacheRecorder {
     }
   }
 
-  private FileTimeIndexCacheWriter getWriter(
-      int dataRegionId, long partitionId, File dataRegionSysDir) {
-    return writerMap
-        .computeIfAbsent(dataRegionId, k -> new ConcurrentHashMap<>())
-        .computeIfAbsent(
-            partitionId,
-            k -> {
-              File partitionDir =
-                  SystemFileFactory.INSTANCE.getFile(dataRegionSysDir, 
String.valueOf(partitionId));
-              File logFile = SystemFileFactory.INSTANCE.getFile(partitionDir, 
FILE_NAME);
-              try {
-                if (!partitionDir.exists() && !partitionDir.mkdirs()) {
-                  LOGGER.debug(
-                      "Partition directory has existed,filePath:{}",
-                      partitionDir.getAbsolutePath());
-                }
-                if (!logFile.createNewFile()) {
-                  LOGGER.debug(
-                      "Partition log file has existed,filePath:{}", 
logFile.getAbsolutePath());
-                }
-                return new FileTimeIndexCacheWriter(logFile, true);
-              } catch (IOException e) {
-                throw new RuntimeException(e);
-              }
-            });
+  private FileTimeIndexCacheWriter getWriter(int dataRegionId, File 
dataRegionSysDir) {
+    return writerMap.computeIfAbsent(
+        dataRegionId,
+        k -> {
+          File logFile = SystemFileFactory.INSTANCE.getFile(dataRegionSysDir, 
FILE_NAME);
+          try {
+            if (!dataRegionSysDir.exists() && !dataRegionSysDir.mkdirs()) {
+              LOGGER.debug(
+                  "DataRegionSysDir has existed,filePath:{}", 
dataRegionSysDir.getAbsolutePath());
+            }
+            if (!logFile.createNewFile()) {
+              LOGGER.debug("FileTimeIndex file has existed,filePath:{}", 
logFile.getAbsolutePath());
+            }
+            return new FileTimeIndexCacheWriter(logFile, true);
+          } catch (IOException e) {
+            LOGGER.error(
+                "FileTimeIndex log file create filed,filePath:{}", 
logFile.getAbsolutePath(), e);
+            throw new RuntimeException(e);
+          }
+        });
   }
 
   public void close() throws IOException {
-    for (Map<Long, FileTimeIndexCacheWriter> partitionWriterMap : 
writerMap.values()) {
-      for (FileTimeIndexCacheWriter writer : partitionWriterMap.values()) {
-        writer.close();
-      }
+    for (FileTimeIndexCacheWriter writer : writerMap.values()) {
+      writer.close();
     }
   }
 
   public void removeFileTimeIndexCache(int dataRegionId) {
-    Map<Long, FileTimeIndexCacheWriter> partitionWriterMap = 
writerMap.get(dataRegionId);
-    if (partitionWriterMap != null) {
-      for (FileTimeIndexCacheWriter writer : partitionWriterMap.values()) {
-        try {
-          writer.close();
-          deleteDirectoryAndEmptyParent(writer.getLogFile());
-        } catch (IOException e) {
-          LOGGER.warn("Meet error when close FileTimeIndexCache: {}", 
e.getMessage());
-        }
+    for (FileTimeIndexCacheWriter writer : writerMap.values()) {
+      try {
+        writer.close();
+        deleteDirectoryAndEmptyParent(writer.getLogFile());
+      } catch (IOException e) {
+        LOGGER.warn("Meet error when close FileTimeIndexCache: {}", 
e.getMessage());
       }
     }
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/utils/fileTimeIndexCache/FileTimeIndexCacheReader.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/utils/fileTimeIndexCache/FileTimeIndexCacheReader.java
index 35682bd8020..d07210736e6 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/utils/fileTimeIndexCache/FileTimeIndexCacheReader.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/utils/fileTimeIndexCache/FileTimeIndexCacheReader.java
@@ -32,7 +32,6 @@ import java.io.IOException;
 import java.nio.channels.FileChannel;
 import java.nio.file.Files;
 import java.nio.file.StandardOpenOption;
-import java.util.HashMap;
 import java.util.Map;
 
 import static 
org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource.getFileTimeIndexSerializedSize;
@@ -44,21 +43,19 @@ public class FileTimeIndexCacheReader {
   private final File logFile;
   private final long fileLength;
   private final int dataRegionId;
-  private final long partitionId;
 
-  public FileTimeIndexCacheReader(File logFile, String dataRegionId, long 
partitionId) {
+  public FileTimeIndexCacheReader(File logFile, String dataRegionId) {
     this.logFile = logFile;
     this.fileLength = logFile.length();
     this.dataRegionId = Integer.parseInt(dataRegionId);
-    this.partitionId = partitionId;
   }
 
-  public Map<TsFileID, FileTimeIndex> read() throws IOException {
-    Map<TsFileID, FileTimeIndex> fileTimeIndexMap = new HashMap<>();
+  public void read(Map<TsFileID, FileTimeIndex> fileTimeIndexMap) throws 
IOException {
     long readLength = 0L;
     try (DataInputStream logStream =
         new DataInputStream(new 
BufferedInputStream(Files.newInputStream(logFile.toPath())))) {
       while (readLength < fileLength) {
+        long partitionId = logStream.readLong();
         long timestamp = logStream.readLong();
         long fileVersion = logStream.readLong();
         long compactionVersion = logStream.readLong();
@@ -79,6 +76,5 @@ public class FileTimeIndexCacheReader {
         channel.truncate(readLength);
       }
     }
-    return fileTimeIndexMap;
   }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/utils/fileTimeIndexCache/FileTimeIndexCacheWriter.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/utils/fileTimeIndexCache/FileTimeIndexCacheWriter.java
index dce4f8b1103..3d88575ba68 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/utils/fileTimeIndexCache/FileTimeIndexCacheWriter.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/utils/fileTimeIndexCache/FileTimeIndexCacheWriter.java
@@ -53,6 +53,10 @@ public class FileTimeIndexCacheWriter implements ILogWriter {
   public void write(ByteBuffer logBuffer) throws IOException {
 
     try {
+      if (!this.logFile.exists()) {
+        // For UT env, logFile may not be created
+        return;
+      }
       channel.write(logBuffer);
       if (this.forceEachWrite) {
         channel.force(true);

Reply via email to