This is an automated email from the ASF dual-hosted git repository.

haonan pushed a commit to branch speed_up_recover
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 3e6580abccf4885e741c3740b011fbf4a1745b49
Author: HTHou <[email protected]>
AuthorDate: Wed Jul 24 10:28:25 2024 +0800

    dev
---
 .../db/storageengine/dataregion/DataRegion.java    |  68 +++++++------
 .../dataregion/memtable/TsFileProcessor.java       |   3 +
 .../dataregion/tsfile/TsFileResource.java          |  18 +++-
 .../tsfile/timeindex/FileTimeIndexCache.java       |  50 ----------
 .../tsfile/timeindex/PartitionLogRecorder.java     | 109 +++++++++++++++++++++
 5 files changed, 167 insertions(+), 81 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
index 1396ab501f4..c1f9575f680 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
@@ -19,6 +19,7 @@
 
 package org.apache.iotdb.db.storageengine.dataregion;
 
+import java.io.FileNotFoundException;
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.cluster.NodeStatus;
 import org.apache.iotdb.commons.conf.CommonDescriptor;
@@ -112,6 +113,7 @@ import 
org.apache.iotdb.db.storageengine.rescon.memory.TsFileResourceManager;
 import 
org.apache.iotdb.db.storageengine.rescon.quotas.DataNodeSpaceQuotaManager;
 import org.apache.iotdb.db.tools.settle.TsFileAndModSettleTool;
 import org.apache.iotdb.db.utils.DateTimeUtils;
+import org.apache.iotdb.db.utils.writelog.PartitionLogReader;
 import org.apache.iotdb.rpc.RpcUtils;
 import org.apache.iotdb.rpc.TSStatusCode;
 
@@ -243,8 +245,8 @@ public class DataRegion implements IDataRegionForQuery {
   /** database name. */
   private final String databaseName;
 
-  /** database system directory. */
-  private File storageGroupSysDir;
+  /** data region system directory. */
+  private File dataRegionSysDir;
 
   /** manage seqFileList and unSeqFileList. */
   private final TsFileManager tsFileManager;
@@ -320,14 +322,13 @@ public class DataRegion implements IDataRegionForQuery {
     this.fileFlushPolicy = fileFlushPolicy;
     acquireDirectBufferMemory();
 
-    storageGroupSysDir = SystemFileFactory.INSTANCE.getFile(systemDir, 
dataRegionId);
-    this.tsFileManager =
-        new TsFileManager(databaseName, dataRegionId, 
storageGroupSysDir.getPath());
-    if (storageGroupSysDir.mkdirs()) {
+    dataRegionSysDir = SystemFileFactory.INSTANCE.getFile(systemDir, 
dataRegionId);
+    this.tsFileManager = new TsFileManager(databaseName, dataRegionId, 
dataRegionSysDir.getPath());
+    if (dataRegionSysDir.mkdirs()) {
       logger.info(
-          "Database system Directory {} doesn't exist, create it", 
storageGroupSysDir.getPath());
-    } else if (!storageGroupSysDir.exists()) {
-      logger.error("create database system Directory {} failed", 
storageGroupSysDir.getPath());
+          "Database system Directory {} doesn't exist, create it", 
dataRegionSysDir.getPath());
+    } else if (!dataRegionSysDir.exists()) {
+      logger.error("create database system Directory {} failed", 
dataRegionSysDir.getPath());
     }
 
     lastFlushTimeMap = new HashLastFlushTimeMap();
@@ -384,15 +385,6 @@ public class DataRegion implements IDataRegionForQuery {
     isReady = ready;
   }
 
-  private Map<Long, List<TsFileResource>> splitResourcesByPartition(
-      List<TsFileResource> resources) {
-    Map<Long, List<TsFileResource>> ret = new TreeMap<>();
-    for (TsFileResource resource : resources) {
-      ret.computeIfAbsent(resource.getTimePartition(), l -> new 
ArrayList<>()).add(resource);
-    }
-    return ret;
-  }
-
   /** this class is used to store recovering context. */
   private class DataRegionRecoveryContext {
     /** number of files to be recovered. */
@@ -451,19 +443,16 @@ public class DataRegion implements IDataRegionForQuery {
 
     try {
       // collect candidate TsFiles from sequential and unsequential data 
directory
-      List<TsFileResource> tmpSeqTsFiles =
-          
getAllFiles(TierManager.getInstance().getAllLocalSequenceFileFolders());
-      List<TsFileResource> tmpUnseqTsFiles =
-          
getAllFiles(TierManager.getInstance().getAllLocalUnSequenceFileFolders());
-
       // split by partition so that we can find the last file of each 
partition and decide to
       // close it or not
-      DataRegionRecoveryContext dataRegionRecoveryContext =
-          new DataRegionRecoveryContext((long) tmpSeqTsFiles.size() + 
tmpUnseqTsFiles.size());
       Map<Long, List<TsFileResource>> partitionTmpSeqTsFiles =
-          splitResourcesByPartition(tmpSeqTsFiles);
+          
getAllFiles(TierManager.getInstance().getAllLocalSequenceFileFolders());
       Map<Long, List<TsFileResource>> partitionTmpUnseqTsFiles =
-          splitResourcesByPartition(tmpUnseqTsFiles);
+          
getAllFiles(TierManager.getInstance().getAllLocalUnSequenceFileFolders());
+      DataRegionRecoveryContext dataRegionRecoveryContext =
+          new DataRegionRecoveryContext(
+              
partitionTmpSeqTsFiles.values().stream().mapToLong(List::size).sum()
+                  + 
partitionTmpUnseqTsFiles.values().stream().mapToLong(List::size).sum());
       // submit unsealed TsFiles to recover
       List<WALRecoverListener> recoverListeners = new ArrayList<>();
       for (List<TsFileResource> value : partitionTmpSeqTsFiles.values()) {
@@ -646,7 +635,7 @@ public class DataRegion implements IDataRegionForQuery {
   }
 
   @SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity 
warning
-  private List<TsFileResource> getAllFiles(List<String> folders)
+  private Map<Long, List<TsFileResource>> getAllFiles(List<String> folders)
       throws IOException, DataRegionException {
     // "{partition id}/{tsfile name}" -> tsfile file, remove duplicate files 
in one time partition
     Map<String, File> tsFilePartitionPath2File = new HashMap<>();
@@ -684,10 +673,12 @@ public class DataRegion implements IDataRegionForQuery {
     sortedFiles.sort(this::compareFileName);
 
     long currentTime = System.currentTimeMillis();
-    List<TsFileResource> ret = new ArrayList<>();
+    Map<Long, List<TsFileResource>> ret = new TreeMap<>();
     for (File f : sortedFiles) {
       checkTsFileTime(f, currentTime);
-      ret.add(new TsFileResource(f));
+      TsFileResource resource = new TsFileResource(f);
+      ret.computeIfAbsent(resource.getTsFileID().timePartitionId, l -> new 
ArrayList<>())
+          .add(resource);
     }
     return ret;
   }
@@ -826,6 +817,19 @@ public class DataRegion implements IDataRegionForQuery {
       DataRegionRecoveryContext context,
       List<TsFileResource> resourceList,
       boolean isSeq) {
+
+    // TODO: read partition file
+    File logFile = SystemFileFactory.INSTANCE.getFile(dataRegionSysDir, 
String.valueOf(partitionId));
+    if (logFile.exists()) {
+      try {
+        PartitionLogReader logReader = new PartitionLogReader(logFile);
+      } catch (IOException e) {
+        throw new RuntimeException(e);
+      }
+
+    }
+
+
     for (TsFileResource tsFileResource : resourceList) {
       recoverSealedTsFiles(tsFileResource, context, isSeq);
     }
@@ -3506,6 +3510,10 @@ public class DataRegion implements IDataRegionForQuery {
     }
   }
 
+  public File getDataRegionSysDir() {
+    return dataRegionSysDir;
+  }
+
   public void addSettleFilesToList(
       List<TsFileResource> seqResourcesToBeSettled,
       List<TsFileResource> unseqResourcesToBeSettled,
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
index ae974b17519..e178290f45c 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
@@ -64,6 +64,7 @@ import 
org.apache.iotdb.db.storageengine.dataregion.read.filescan.impl.DiskAlign
 import 
org.apache.iotdb.db.storageengine.dataregion.read.filescan.impl.DiskChunkHandleImpl;
 import 
org.apache.iotdb.db.storageengine.dataregion.read.filescan.impl.UnclosedFileScanHandleImpl;
 import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
+import 
org.apache.iotdb.db.storageengine.dataregion.tsfile.timeindex.PartitionLogRecorder;
 import org.apache.iotdb.db.storageengine.dataregion.utils.SharedTimeDataBuffer;
 import org.apache.iotdb.db.storageengine.dataregion.wal.WALManager;
 import org.apache.iotdb.db.storageengine.dataregion.wal.node.IWALNode;
@@ -1581,6 +1582,8 @@ public class TsFileProcessor {
     }
     writer.endFile();
     tsFileResource.serialize();
+    PartitionLogRecorder.getInstance()
+        .submitTask(dataRegionInfo.getDataRegion().getDataRegionSysDir(), 
tsFileResource);
     if (logger.isDebugEnabled()) {
       logger.debug("Ended file {}", tsFileResource);
     }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileResource.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileResource.java
index 384ee410c54..0727d7e770f 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileResource.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileResource.java
@@ -22,6 +22,7 @@ package org.apache.iotdb.db.storageengine.dataregion.tsfile;
 import org.apache.iotdb.commons.consensus.index.ProgressIndex;
 import org.apache.iotdb.commons.consensus.index.ProgressIndexType;
 import org.apache.iotdb.commons.consensus.index.impl.MinimumProgressIndex;
+import 
org.apache.iotdb.commons.exception.runtime.SerializationRunTimeException;
 import org.apache.iotdb.commons.path.PartialPath;
 import org.apache.iotdb.commons.utils.CommonDateTimeUtils;
 import org.apache.iotdb.commons.utils.TestOnly;
@@ -49,6 +50,7 @@ import org.apache.tsfile.fileSystem.fsFactory.FSFactory;
 import org.apache.tsfile.read.filter.basic.Filter;
 import org.apache.tsfile.utils.FilePathUtils;
 import org.apache.tsfile.utils.Pair;
+import org.apache.tsfile.utils.PublicBAOS;
 import org.apache.tsfile.utils.RamUsageEstimator;
 import org.apache.tsfile.utils.ReadWriteIOUtils;
 import org.apache.tsfile.write.writer.TsFileIOWriter;
@@ -56,10 +58,12 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.BufferedOutputStream;
+import java.io.DataOutputStream;
 import java.io.File;
 import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
+import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
@@ -232,7 +236,6 @@ public class TsFileResource {
     File dest = fsFactory.getFile(file + RESOURCE_SUFFIX);
     fsFactory.deleteIfExists(dest);
     fsFactory.moveFile(src, dest);
-
   }
 
   private void serializeTo(BufferedOutputStream outputStream) throws 
IOException {
@@ -293,6 +296,19 @@ public class TsFileResource {
     }
   }
 
+  public ByteBuffer serializeFileTimeIndexToByteBuffer() {
+    try (PublicBAOS byteArrayOutputStream = new PublicBAOS();
+        DataOutputStream outputStream = new 
DataOutputStream(byteArrayOutputStream)) {
+      outputStream.writeLong(tsFileID.fileVersion);
+      outputStream.writeLong(tsFileID.compactionVersion);
+      outputStream.writeLong(timeIndex.getMinStartTime());
+      outputStream.writeLong(timeIndex.getMaxEndTime());
+      return ByteBuffer.wrap(byteArrayOutputStream.getBuf(), 0, 
byteArrayOutputStream.size());
+    } catch (IOException e) {
+      throw new SerializationRunTimeException(e);
+    }
+  }
+
   public void updateStartTime(IDeviceID device, long time) {
     timeIndex.updateStartTime(device, time);
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/timeindex/FileTimeIndexCache.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/timeindex/FileTimeIndexCache.java
deleted file mode 100644
index ca9118d7d78..00000000000
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/timeindex/FileTimeIndexCache.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.db.storageengine.dataregion.tsfile.timeindex;
-
-import java.util.concurrent.TimeUnit;
-import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
-import org.apache.iotdb.commons.concurrent.ThreadName;
-import org.apache.iotdb.commons.concurrent.threadpool.ScheduledExecutorUtil;
-import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
-
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.ScheduledExecutorService;
-
-public class FileTimeIndexCache {
-
-  private ScheduledExecutorService recordFileIndexThread;
-
-  private final BlockingQueue<TsFileResource> resourceQueue = new 
ArrayBlockingQueue<>(100);
-
-  public void add(TsFileResource resource) {
-    resourceQueue.add(resource);
-  }
-
-//  public void recordTsFileResource() {
-//    recordFileIndexThread =
-//        IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor(
-//            ThreadName.FILE_TIMEINDEX_RECORD.getName());
-//        ScheduledExecutorUtil.safelyScheduleWithFixedDelay(
-//            recordFileIndexThread, this::deleteOutdatedFiles, initDelayMs, 
periodMs,
-//     TimeUnit.MILLISECONDS);
-//  }
-}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/timeindex/PartitionLogRecorder.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/timeindex/PartitionLogRecorder.java
new file mode 100644
index 00000000000..9ba45ca2f15
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/timeindex/PartitionLogRecorder.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.storageengine.dataregion.tsfile.timeindex;
+
+import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
+import org.apache.iotdb.commons.concurrent.ThreadName;
+import org.apache.iotdb.commons.concurrent.threadpool.ScheduledExecutorUtil;
+import org.apache.iotdb.commons.file.SystemFileFactory;
+import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileID;
+import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
+import org.apache.iotdb.db.utils.writelog.LogWriter;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import org.apache.iotdb.db.utils.writelog.PartitionLogWriter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class PartitionLogRecorder {
+
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(PartitionLogRecorder.class);
+
+  private final ScheduledExecutorService recordFileIndexThread;
+
+  private final BlockingQueue<Runnable> taskQueue = new 
LinkedBlockingQueue<>();
+
+  private final Map<Integer, Map<Long, PartitionLogWriter>> writerMap = new 
HashMap<>();
+
+  private PartitionLogRecorder() {
+    recordFileIndexThread =
+        IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor(
+            ThreadName.FILE_TIMEINDEX_RECORD.getName());
+    ScheduledExecutorUtil.safelyScheduleWithFixedDelay(
+        recordFileIndexThread, this::executeTasks, 0, 1, TimeUnit.SECONDS);
+  }
+
+  private void executeTasks() {
+    Runnable task;
+    while ((task = taskQueue.poll()) != null) {
+      recordFileIndexThread.submit(task);
+    }
+  }
+
+  public void submitTask(File dataRegionSysDir, TsFileResource tsFileResource) 
{
+    TsFileID tsFileID = tsFileResource.getTsFileID();
+    int dataRegionId = tsFileID.regionId;
+    long partitionId = tsFileID.timePartitionId;
+
+    PartitionLogWriter writer =
+        writerMap
+            .computeIfAbsent(dataRegionId, k -> new HashMap<>())
+            .computeIfAbsent(
+                partitionId,
+                k -> {
+                  try {
+                    File logFile = 
SystemFileFactory.INSTANCE.getFile(dataRegionSysDir, 
String.valueOf(partitionId));
+                    if (!logFile.createNewFile()) {
+                      LOGGER.warn(
+                          "Partition log file has existed,filePath:{}",
+                          logFile.getAbsolutePath());
+                    }
+                    return new PartitionLogWriter(logFile, false);
+                  } catch (IOException e) {
+                    throw new RuntimeException(e);
+                  }
+                });
+    taskQueue.offer(
+        () -> {
+          try {
+            writer.write(tsFileResource.serializeFileTimeIndexToByteBuffer());
+          } catch (IOException e) {
+            throw new RuntimeException(e);
+          }
+        });
+  }
+
+  public static PartitionLogRecorder getInstance() {
+    return PartitionLogRecorder.InstanceHolder.INSTANCE;
+  }
+
+  private static class InstanceHolder {
+    private InstanceHolder() {}
+
+    private static final PartitionLogRecorder INSTANCE = new 
PartitionLogRecorder();
+  }
+}

Reply via email to