This is an automated email from the ASF dual-hosted git repository.

haonan pushed a commit to branch speed_up_recover
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/speed_up_recover by this push:
     new 9f3fb2fbd90 update last flush time after async recover finished
9f3fb2fbd90 is described below

commit 9f3fb2fbd903fc25b23f1fbed09caf81930f0a31
Author: HTHou <[email protected]>
AuthorDate: Tue Jul 30 18:14:15 2024 +0800

    update last flush time after async recover finished
---
 .../impl/DataNodeInternalRPCServiceImpl.java       |  2 +-
 .../config/executor/ClusterConfigTaskExecutor.java |  2 +-
 .../iotdb/db/storageengine/StorageEngine.java      | 56 ++++++++++-----
 .../db/storageengine/dataregion/DataRegion.java    | 83 +++++++++++++---------
 .../dataregion/HashLastFlushTimeMap.java           | 60 ++++++++++------
 .../dataregion/ILastFlushTimeMap.java              |  9 +--
 .../schedule/CompactionScheduleTaskWorker.java     |  2 +-
 .../compaction/schedule/TTLScheduleTask.java       |  2 +-
 8 files changed, 133 insertions(+), 83 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
index 5cffe736390..d7c906f39f8 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
@@ -1782,7 +1782,7 @@ public class DataNodeInternalRPCServiceImpl implements 
IDataNodeRPCService.Iface
 
   @Override
   public TSStatus startRepairData() throws TException {
-    if (!storageEngine.isReadyForReadAndWrite()) {
+    if (!storageEngine.isReady()) {
       return RpcUtils.getStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR, "not all 
sg is ready");
     }
     IoTDBConfig iotdbConfig = IoTDBDescriptor.getInstance().getConfig();
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
index 292786ef7ec..52c038548e4 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
@@ -1084,7 +1084,7 @@ public class ClusterConfigTaskExecutor implements 
IConfigTaskExecutor {
         future.setException(e);
       }
     } else {
-      if (!StorageEngine.getInstance().isReadyForReadAndWrite()) {
+      if (!StorageEngine.getInstance().isReady()) {
         future.setException(
             new IoTDBException(
                 "not all sg is ready", 
TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode()));
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/StorageEngine.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/StorageEngine.java
index 3853bc56069..429d9d84ac5 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/StorageEngine.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/StorageEngine.java
@@ -134,19 +134,21 @@ public class StorageEngine implements IService {
   /** number of ready data region */
   private AtomicInteger readyDataRegionNum;
 
-  private AtomicBoolean isReadyForReadAndWrite = new AtomicBoolean(false);
+  private final AtomicBoolean isReadyForReadAndWrite = new AtomicBoolean();
+
+  private final AtomicBoolean isReady = new AtomicBoolean();
 
   private ScheduledExecutorService seqMemtableTimedFlushCheckThread;
   private ScheduledExecutorService unseqMemtableTimedFlushCheckThread;
 
-  private TsFileFlushPolicy fileFlushPolicy = new DirectFlushPolicy();
+  private final TsFileFlushPolicy fileFlushPolicy = new DirectFlushPolicy();
 
   /** used to do short-lived asynchronous tasks */
   private ExecutorService cachedThreadPool;
 
   // add customized listeners here for flush and close events
-  private List<CloseFileListener> customCloseFileListeners = new ArrayList<>();
-  private List<FlushListener> customFlushListeners = new ArrayList<>();
+  private final List<CloseFileListener> customCloseFileListeners = new 
ArrayList<>();
+  private final List<FlushListener> customFlushListeners = new ArrayList<>();
   private int recoverDataRegionNum = 0;
 
   private final LoadTsFileManager loadTsFileManager = new LoadTsFileManager();
@@ -182,12 +184,13 @@ public class StorageEngine implements IService {
     return isReadyForReadAndWrite.get();
   }
 
-  public void setReadyForReadAndWrite(boolean isReadyForReadAndWrite) {
-    this.isReadyForReadAndWrite.set(isReadyForReadAndWrite);
+  public boolean isReady() {
+    return isReady.get();
   }
 
-  public void asyncRecover() throws StartupException {
-    setReadyForReadAndWrite(false);
+  private void asyncRecoverDataRegion() throws StartupException {
+    isReady.set(false);
+    isReadyForReadAndWrite.set(false);
     cachedThreadPool =
         
IoTDBThreadPoolFactory.newCachedThreadPool(ThreadName.STORAGE_ENGINE_CACHED_POOL.getName());
 
@@ -208,8 +211,7 @@ public class StorageEngine implements IService {
         new Thread(
             () -> {
               checkResults(futures, "StorageEngine failed to recover.");
-              recoverRepairData();
-              setReadyForReadAndWrite(true);
+              isReadyForReadAndWrite.set(true);
             },
             ThreadName.STORAGE_ENGINE_RECOVER_TRIGGER.getName());
     recoverEndTrigger.start();
@@ -283,11 +285,11 @@ public class StorageEngine implements IService {
       throw new StorageEngineFailureException(e);
     }
 
-    asyncRecover();
-
-    LOGGER.info("start ttl check thread successfully.");
+    asyncRecoverDataRegion();
 
     startTimedService();
+
+    asyncRecoverTsFileResource();
   }
 
   private void startTimedService() {
@@ -335,6 +337,28 @@ public class StorageEngine implements IService {
     }
   }
 
+  private void asyncRecoverTsFileResource() {
+    List<Future<Void>> futures = new LinkedList<>();
+    for (DataRegion dataRegion : dataRegionMap.values()) {
+      if (dataRegion != null) {
+        List<Callable<Void>> asyncTsFileResourceRecoverTasks =
+            dataRegion.getAsyncTsFileResourceRecoverTaskList();
+        for (Callable<Void> task : asyncTsFileResourceRecoverTasks) {
+          futures.add(cachedThreadPool.submit(task));
+        }
+      }
+    }
+    Thread recoverEndTrigger =
+        new Thread(
+            () -> {
+              checkResults(futures, "async recover tsfile resource meets 
error.");
+              recoverRepairData();
+              isReadyForReadAndWrite.set(true);
+            },
+            ThreadName.STORAGE_ENGINE_RECOVER_TRIGGER.getName());
+    recoverEndTrigger.start();
+  }
+
   @Override
   public void stop() {
     for (DataRegion dataRegion : dataRegionMap.values()) {
@@ -645,8 +669,6 @@ public class StorageEngine implements IService {
   /**
    * Add a listener to listen flush start/end events. Notice that this 
addition only applies to
    * TsFileProcessors created afterwards.
-   *
-   * @param listener
    */
   public void registerFlushListener(FlushListener listener) {
     customFlushListeners.add(listener);
@@ -655,8 +677,6 @@ public class StorageEngine implements IService {
   /**
    * Add a listener to listen file close events. Notice that this addition 
only applies to
    * TsFileProcessors created afterwards.
-   *
-   * @param listener
    */
   public void registerCloseFileListener(CloseFileListener listener) {
     customCloseFileListeners.add(listener);
@@ -672,7 +692,7 @@ public class StorageEngine implements IService {
   }
 
   // When registering a new region, the coordinator needs to register the 
corresponding region with
-  // the local storageengine before adding the corresponding consensusGroup to 
the consensus layer
+  // the local storage before adding the corresponding consensusGroup to the 
consensus layer
   public DataRegion createDataRegion(DataRegionId regionId, String sg) throws 
DataRegionException {
     makeSureNoOldRegion(regionId);
     AtomicReference<DataRegionException> exceptionAtomicReference = new 
AtomicReference<>(null);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
index 6a04147e172..082cde7cfa3 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
@@ -21,7 +21,6 @@ package org.apache.iotdb.db.storageengine.dataregion;
 
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.cluster.NodeStatus;
-import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
 import org.apache.iotdb.commons.conf.CommonDescriptor;
 import org.apache.iotdb.commons.consensus.DataRegionId;
 import org.apache.iotdb.commons.exception.MetadataException;
@@ -157,7 +156,6 @@ import java.util.concurrent.Callable;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
 import java.util.concurrent.Phaser;
 import java.util.concurrent.TimeUnit;
@@ -287,7 +285,7 @@ public class DataRegion implements IDataRegionForQuery {
   /** whether it's ready from recovery. */
   private boolean isReady = false;
 
-  private ExecutorService asyncRecoverExecutorService;
+  private List<Callable<Void>> asyncTsFileResourceRecoverTaskList;
 
   /** close file listeners. */
   private List<CloseFileListener> customCloseFileListeners = 
Collections.emptyList();
@@ -364,8 +362,7 @@ public class DataRegion implements IDataRegionForQuery {
         }
       }
     } else {
-      asyncRecoverExecutorService =
-          
IoTDBThreadPoolFactory.newSingleThreadExecutor("AsyncTsFileResourceRecover");
+      asyncTsFileResourceRecoverTaskList = new ArrayList<>();
       recover();
     }
 
@@ -390,6 +387,10 @@ public class DataRegion implements IDataRegionForQuery {
     return isReady;
   }
 
+  public List<Callable<Void>> getAsyncTsFileResourceRecoverTaskList() {
+    return asyncTsFileResourceRecoverTaskList;
+  }
+
   /** this class is used to store recovering context. */
   private class DataRegionRecoveryContext {
     /** number of files to be recovered. */
@@ -446,7 +447,6 @@ public class DataRegion implements IDataRegionForQuery {
       throw new DataRegionException(e);
     }
 
-    List<Callable<?>> asyncRecoverTaskList = new ArrayList<>();
     try {
       // collect candidate TsFiles from sequential and unsequential data 
directory
       // split by partition so that we can find the last file of each 
partition and decide to
@@ -539,26 +539,26 @@ public class DataRegion implements IDataRegionForQuery {
                   ((TreeMap<Long, List<TsFileResource>>) 
partitionTmpUnseqTsFiles).lastKey());
         }
         for (Entry<Long, List<TsFileResource>> partitionFiles : 
partitionTmpSeqTsFiles.entrySet()) {
-          Callable<?> asyncRecoverTask =
+          Callable<Void> asyncRecoverTask =
               recoverFilesInPartition(
                   partitionFiles.getKey(),
                   dataRegionRecoveryContext,
                   partitionFiles.getValue(),
                   true);
           if (asyncRecoverTask != null) {
-            asyncRecoverTaskList.add(asyncRecoverTask);
+            asyncTsFileResourceRecoverTaskList.add(asyncRecoverTask);
           }
         }
         for (Entry<Long, List<TsFileResource>> partitionFiles :
             partitionTmpUnseqTsFiles.entrySet()) {
-          Callable<?> asyncRecoverTask =
+          Callable<Void> asyncRecoverTask =
               recoverFilesInPartition(
                   partitionFiles.getKey(),
                   dataRegionRecoveryContext,
                   partitionFiles.getValue(),
                   false);
           if (asyncRecoverTask != null) {
-            asyncRecoverTaskList.add(asyncRecoverTask);
+            asyncTsFileResourceRecoverTaskList.add(asyncRecoverTask);
           }
         }
         if (config.isEnableSeparateData()) {
@@ -608,7 +608,7 @@ public class DataRegion implements IDataRegionForQuery {
       throw new DataRegionException(e);
     }
 
-    if (isReady) {
+    if (asyncTsFileResourceRecoverTaskList.isEmpty()) {
       initCompactionSchedule();
     }
 
@@ -642,6 +642,23 @@ public class DataRegion implements IDataRegionForQuery {
     }
   }
 
+  protected void upgradeAndUpdateDeviceLastFlushTime(
+      long timePartitionId, List<TsFileResource> resources) {
+    Map<IDeviceID, Long> endTimeMap = new HashMap<>();
+    for (TsFileResource resource : resources) {
+      for (IDeviceID deviceId : resource.getDevices()) {
+        long endTime = resource.getEndTime(deviceId);
+        endTimeMap.put(deviceId, endTime);
+      }
+    }
+    if (config.isEnableSeparateData()) {
+      lastFlushTimeMap.upgradeAndUpdateMultiDeviceFlushedTime(timePartitionId, 
endTimeMap);
+    }
+    if (CommonDescriptor.getInstance().getConfig().isLastCacheEnable()) {
+      lastFlushTimeMap.updateMultiDeviceGlobalFlushedTime(endTimeMap);
+    }
+  }
+
   public void initCompactionSchedule() {
     if (!config.isEnableSeqSpaceCompaction()
         && !config.isEnableUnseqSpaceCompaction()
@@ -842,7 +859,7 @@ public class DataRegion implements IDataRegionForQuery {
     }
   }
 
-  private Callable<?> recoverFilesInPartition(
+  private Callable<Void> recoverFilesInPartition(
       long partitionId,
       DataRegionRecoveryContext context,
       List<TsFileResource> resourceList,
@@ -862,7 +879,7 @@ public class DataRegion implements IDataRegionForQuery {
       }
       List<TsFileResource> resourceListForAsyncRecover = new ArrayList<>();
       List<TsFileResource> resourceListForSyncRecover = new ArrayList<>();
-      Callable<?> asyncRecoverTask = null;
+      Callable<Void> asyncRecoverTask = null;
       for (TsFileResource tsFileResource : resourceList) {
         if (fileTimeIndexMap.containsKey(tsFileResource.getTsFileID())) {
           
tsFileResource.setTimeIndex(fileTimeIndexMap.get(tsFileResource.getTsFileID()));
@@ -892,24 +909,6 @@ public class DataRegion implements IDataRegionForQuery {
       DataRegionRecoveryContext context,
       List<TsFileResource> resourceList,
       boolean isSeq) {
-    // TODO: async recover
-    Callable<Void> recoverSealedTsFileTask =
-        () -> {
-          for (TsFileResource tsFileResource : resourceList) {
-            try (SealedTsFileRecoverPerformer recoverPerformer =
-                new SealedTsFileRecoverPerformer(tsFileResource)) {
-              recoverPerformer.recover();
-              
tsFileResourceManager.registerSealedTsFileResource(tsFileResource);
-            } catch (Throwable e) {
-              logger.error(
-                  "Fail to recover sealed TsFile {}, skip it.", 
tsFileResource.getTsFilePath(), e);
-            } finally {
-              // update recovery context
-              context.incrementRecoveredFilesNum();
-            }
-          }
-          return null;
-        };
     if (config.isEnableSeparateData()) {
       if (!lastFlushTimeMap.checkAndCreateFlushedTimePartition(partitionId, 
false)) {
         TimePartitionManager.getInstance()
@@ -932,7 +931,27 @@ public class DataRegion implements IDataRegionForQuery {
               lastFlushTimeMap.getMemSize(partitionId),
               false);
     }
-    return recoverSealedTsFileTask;
+    return () -> {
+      for (TsFileResource tsFileResource : resourceList) {
+        try (SealedTsFileRecoverPerformer recoverPerformer =
+            new SealedTsFileRecoverPerformer(tsFileResource)) {
+          recoverPerformer.recover();
+          tsFileResourceManager.registerSealedTsFileResource(tsFileResource);
+        } catch (Throwable e) {
+          logger.error(
+              "Fail to recover sealed TsFile {}, skip it.", 
tsFileResource.getTsFilePath(), e);
+        } finally {
+          // update recovery context
+          context.incrementRecoveredFilesNum();
+        }
+      }
+      // TODO: After recover, replace partition last flush time with device 
last flush time
+      if (config.isEnableSeparateData()) {
+        upgradeAndUpdateDeviceLastFlushTime(partitionId, resourceList);
+      }
+
+      return null;
+    };
   }
 
   private void syncRecoverFilesInPartition(
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/HashLastFlushTimeMap.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/HashLastFlushTimeMap.java
index 9a827c87440..439e255115d 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/HashLastFlushTimeMap.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/HashLastFlushTimeMap.java
@@ -59,22 +59,7 @@ public class HashLastFlushTimeMap implements 
ILastFlushTimeMap {
   /** record memory cost of map for each partitionId */
   private final Map<Long, Long> memCostForEachPartition = new 
ConcurrentHashMap<>();
 
-  // For load
-  @Override
-  public void updateOneDeviceFlushedTime(long timePartitionId, IDeviceID 
deviceId, long time) {
-    ILastFlushTime flushTimeMapForPartition =
-        partitionLatestFlushedTime.computeIfAbsent(
-            timePartitionId, id -> new DeviceLastFlushTime());
-    long lastFlushTime = flushTimeMapForPartition.getLastFlushTime(deviceId);
-    if (lastFlushTime == Long.MIN_VALUE) {
-      long memCost = HASHMAP_NODE_BASIC_SIZE + deviceId.ramBytesUsed();
-      memCostForEachPartition.compute(
-          timePartitionId, (k1, v1) -> v1 == null ? memCost : v1 + memCost);
-    }
-    flushTimeMapForPartition.updateLastFlushTime(deviceId, time);
-  }
-
-  // For recover
+  // For recover and load
   @Override
   public void updateMultiDeviceFlushedTime(
       long timePartitionId, Map<IDeviceID, Long> flushedTimeMap) {
@@ -82,7 +67,7 @@ public class HashLastFlushTimeMap implements 
ILastFlushTimeMap {
         partitionLatestFlushedTime.computeIfAbsent(
             timePartitionId, id -> new DeviceLastFlushTime());
 
-    long memIncr = 0;
+    long memIncr = 0L;
     for (Map.Entry<IDeviceID, Long> entry : flushedTimeMap.entrySet()) {
       if (flushTimeMapForPartition.getLastFlushTime(entry.getKey()) == 
Long.MIN_VALUE) {
         memIncr += HASHMAP_NODE_BASIC_SIZE + entry.getKey().ramBytesUsed();
@@ -94,6 +79,41 @@ public class HashLastFlushTimeMap implements 
ILastFlushTimeMap {
         timePartitionId, (k1, v1) -> v1 == null ? finalMemIncr : v1 + 
finalMemIncr);
   }
 
+  // For recover only
+  @Override
+  public void upgradeAndUpdateMultiDeviceFlushedTime(
+      long timePartitionId, Map<IDeviceID, Long> flushedTimeMap) {
+    ILastFlushTime flushTimeMapForPartition =
+        partitionLatestFlushedTime.computeIfAbsent(
+            timePartitionId, id -> new DeviceLastFlushTime());
+    // upgrade
+    if (flushTimeMapForPartition instanceof PartitionLastFlushTime) {
+      long maxFlushTime = flushTimeMapForPartition.getLastFlushTime(null);
+      ILastFlushTime newDeviceLastFlushTime = new DeviceLastFlushTime();
+      long memIncr = 0;
+      for (Map.Entry<IDeviceID, Long> entry : flushedTimeMap.entrySet()) {
+        memIncr += HASHMAP_NODE_BASIC_SIZE + entry.getKey().ramBytesUsed();
+        newDeviceLastFlushTime.updateLastFlushTime(entry.getKey(), 
entry.getValue());
+        maxFlushTime = Math.max(maxFlushTime, entry.getValue());
+      }
+      long finalMemIncr = memIncr;
+      memCostForEachPartition.compute(
+          timePartitionId, (k1, v1) -> v1 == null ? finalMemIncr : v1 + 
finalMemIncr);
+    } else {
+      // should not go here
+      long memIncr = 0;
+      for (Map.Entry<IDeviceID, Long> entry : flushedTimeMap.entrySet()) {
+        if (flushTimeMapForPartition.getLastFlushTime(entry.getKey()) == 
Long.MIN_VALUE) {
+          memIncr += HASHMAP_NODE_BASIC_SIZE + entry.getKey().ramBytesUsed();
+        }
+        flushTimeMapForPartition.updateLastFlushTime(entry.getKey(), 
entry.getValue());
+      }
+      long finalMemIncr = memIncr;
+      memCostForEachPartition.compute(
+          timePartitionId, (k1, v1) -> v1 == null ? finalMemIncr : v1 + 
finalMemIncr);
+    }
+  }
+
   // For recover
   @Override
   public void updatePartitionFlushedTime(long timePartitionId, long 
maxFlushedTime) {
@@ -108,12 +128,6 @@ public class HashLastFlushTimeMap implements 
ILastFlushTimeMap {
         timePartitionId, (k1, v1) -> v1 == null ? memIncr : v1 + memIncr);
   }
 
-  @Override
-  public void updateOneDeviceGlobalFlushedTime(IDeviceID path, long time) {
-    globalLatestFlushedTimeForEachDevice.compute(
-        path, (k, v) -> v == null ? time : Math.max(v, time));
-  }
-
   @Override
   public void updateMultiDeviceGlobalFlushedTime(Map<IDeviceID, Long> 
globalFlushedTimeMap) {
     for (Map.Entry<IDeviceID, Long> entry : globalFlushedTimeMap.entrySet()) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/ILastFlushTimeMap.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/ILastFlushTimeMap.java
index 69c8345dcbf..7bdd141bf6b 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/ILastFlushTimeMap.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/ILastFlushTimeMap.java
@@ -27,18 +27,15 @@ import java.util.Map;
 public interface ILastFlushTimeMap {
 
   // region update
-  /** Update partitionLatestFlushedTime. */
-  void updateOneDeviceFlushedTime(long timePartitionId, IDeviceID deviceId, 
long time);
-
   void updateMultiDeviceFlushedTime(long timePartitionId, Map<IDeviceID, Long> 
flushedTimeMap);
 
   void updatePartitionFlushedTime(long timePartitionId, long maxFlushedTime);
 
-  /** Update globalLatestFlushedTimeForEachDevice. */
-  void updateOneDeviceGlobalFlushedTime(IDeviceID path, long time);
-
   void updateMultiDeviceGlobalFlushedTime(Map<IDeviceID, Long> 
globalFlushedTimeMap);
 
+  void upgradeAndUpdateMultiDeviceFlushedTime(
+      long timePartitionId, Map<IDeviceID, Long> flushedTimeMap);
+
   /** Update both partitionLatestFlushedTime and 
globalLatestFlushedTimeForEachDevice. */
   void updateLatestFlushTime(long partitionId, Map<IDeviceID, Long> updateMap);
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/CompactionScheduleTaskWorker.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/CompactionScheduleTaskWorker.java
index e52ac8c09e4..92c5cb43c6a 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/CompactionScheduleTaskWorker.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/CompactionScheduleTaskWorker.java
@@ -53,7 +53,7 @@ public class CompactionScheduleTaskWorker implements 
Callable<Void> {
     while (true) {
       try {
         
Thread.sleep(IoTDBDescriptor.getInstance().getConfig().getCompactionScheduleIntervalInMs());
-        if (!StorageEngine.getInstance().isReadyForReadAndWrite()) {
+        if (!StorageEngine.getInstance().isReady()) {
           continue;
         }
         List<DataRegion> dataRegionListSnapshot = new 
ArrayList<>(dataRegionList);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/TTLScheduleTask.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/TTLScheduleTask.java
index 7a2c4b887ca..9c14430d682 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/TTLScheduleTask.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/TTLScheduleTask.java
@@ -53,7 +53,7 @@ public class TTLScheduleTask implements Callable<Void> {
     while (true) {
       try {
         Thread.sleep(ttlCheckInterval);
-        if (!StorageEngine.getInstance().isReadyForReadAndWrite()) {
+        if (!StorageEngine.getInstance().isReady()) {
           continue;
         }
         List<DataRegion> dataRegionListSnapshot = new 
ArrayList<>(dataRegionList);

Reply via email to