This is an automated email from the ASF dual-hosted git repository.
haonan pushed a commit to branch speed_up_recover
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/speed_up_recover by this push:
new 9f3fb2fbd90 update last flush time after async recover finished
9f3fb2fbd90 is described below
commit 9f3fb2fbd903fc25b23f1fbed09caf81930f0a31
Author: HTHou <[email protected]>
AuthorDate: Tue Jul 30 18:14:15 2024 +0800
update last flush time after async recover finished
---
.../impl/DataNodeInternalRPCServiceImpl.java | 2 +-
.../config/executor/ClusterConfigTaskExecutor.java | 2 +-
.../iotdb/db/storageengine/StorageEngine.java | 56 ++++++++++-----
.../db/storageengine/dataregion/DataRegion.java | 83 +++++++++++++---------
.../dataregion/HashLastFlushTimeMap.java | 60 ++++++++++------
.../dataregion/ILastFlushTimeMap.java | 9 +--
.../schedule/CompactionScheduleTaskWorker.java | 2 +-
.../compaction/schedule/TTLScheduleTask.java | 2 +-
8 files changed, 133 insertions(+), 83 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
index 5cffe736390..d7c906f39f8 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
@@ -1782,7 +1782,7 @@ public class DataNodeInternalRPCServiceImpl implements
IDataNodeRPCService.Iface
@Override
public TSStatus startRepairData() throws TException {
- if (!storageEngine.isReadyForReadAndWrite()) {
+ if (!storageEngine.isReady()) {
return RpcUtils.getStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR, "not all
sg is ready");
}
IoTDBConfig iotdbConfig = IoTDBDescriptor.getInstance().getConfig();
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
index 292786ef7ec..52c038548e4 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
@@ -1084,7 +1084,7 @@ public class ClusterConfigTaskExecutor implements
IConfigTaskExecutor {
future.setException(e);
}
} else {
- if (!StorageEngine.getInstance().isReadyForReadAndWrite()) {
+ if (!StorageEngine.getInstance().isReady()) {
future.setException(
new IoTDBException(
"not all sg is ready",
TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode()));
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/StorageEngine.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/StorageEngine.java
index 3853bc56069..429d9d84ac5 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/StorageEngine.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/StorageEngine.java
@@ -134,19 +134,21 @@ public class StorageEngine implements IService {
/** number of ready data region */
private AtomicInteger readyDataRegionNum;
- private AtomicBoolean isReadyForReadAndWrite = new AtomicBoolean(false);
+ private final AtomicBoolean isReadyForReadAndWrite = new AtomicBoolean();
+
+ private final AtomicBoolean isReady = new AtomicBoolean();
private ScheduledExecutorService seqMemtableTimedFlushCheckThread;
private ScheduledExecutorService unseqMemtableTimedFlushCheckThread;
- private TsFileFlushPolicy fileFlushPolicy = new DirectFlushPolicy();
+ private final TsFileFlushPolicy fileFlushPolicy = new DirectFlushPolicy();
/** used to do short-lived asynchronous tasks */
private ExecutorService cachedThreadPool;
// add customized listeners here for flush and close events
- private List<CloseFileListener> customCloseFileListeners = new ArrayList<>();
- private List<FlushListener> customFlushListeners = new ArrayList<>();
+ private final List<CloseFileListener> customCloseFileListeners = new
ArrayList<>();
+ private final List<FlushListener> customFlushListeners = new ArrayList<>();
private int recoverDataRegionNum = 0;
private final LoadTsFileManager loadTsFileManager = new LoadTsFileManager();
@@ -182,12 +184,13 @@ public class StorageEngine implements IService {
return isReadyForReadAndWrite.get();
}
- public void setReadyForReadAndWrite(boolean isReadyForReadAndWrite) {
- this.isReadyForReadAndWrite.set(isReadyForReadAndWrite);
+ public boolean isReady() {
+ return isReady.get();
}
- public void asyncRecover() throws StartupException {
- setReadyForReadAndWrite(false);
+ private void asyncRecoverDataRegion() throws StartupException {
+ isReady.set(false);
+ isReadyForReadAndWrite.set(false);
cachedThreadPool =
IoTDBThreadPoolFactory.newCachedThreadPool(ThreadName.STORAGE_ENGINE_CACHED_POOL.getName());
@@ -208,8 +211,7 @@ public class StorageEngine implements IService {
new Thread(
() -> {
checkResults(futures, "StorageEngine failed to recover.");
- recoverRepairData();
- setReadyForReadAndWrite(true);
+ isReadyForReadAndWrite.set(true);
},
ThreadName.STORAGE_ENGINE_RECOVER_TRIGGER.getName());
recoverEndTrigger.start();
@@ -283,11 +285,11 @@ public class StorageEngine implements IService {
throw new StorageEngineFailureException(e);
}
- asyncRecover();
-
- LOGGER.info("start ttl check thread successfully.");
+ asyncRecoverDataRegion();
startTimedService();
+
+ asyncRecoverTsFileResource();
}
private void startTimedService() {
@@ -335,6 +337,28 @@ public class StorageEngine implements IService {
}
}
+ private void asyncRecoverTsFileResource() {
+ List<Future<Void>> futures = new LinkedList<>();
+ for (DataRegion dataRegion : dataRegionMap.values()) {
+ if (dataRegion != null) {
+ List<Callable<Void>> asyncTsFileResourceRecoverTasks =
+ dataRegion.getAsyncTsFileResourceRecoverTaskList();
+ for (Callable<Void> task : asyncTsFileResourceRecoverTasks) {
+ futures.add(cachedThreadPool.submit(task));
+ }
+ }
+ }
+ Thread recoverEndTrigger =
+ new Thread(
+ () -> {
+ checkResults(futures, "async recover tsfile resource meets
error.");
+ recoverRepairData();
+ isReadyForReadAndWrite.set(true);
+ },
+ ThreadName.STORAGE_ENGINE_RECOVER_TRIGGER.getName());
+ recoverEndTrigger.start();
+ }
+
@Override
public void stop() {
for (DataRegion dataRegion : dataRegionMap.values()) {
@@ -645,8 +669,6 @@ public class StorageEngine implements IService {
/**
* Add a listener to listen flush start/end events. Notice that this
addition only applies to
* TsFileProcessors created afterwards.
- *
- * @param listener
*/
public void registerFlushListener(FlushListener listener) {
customFlushListeners.add(listener);
@@ -655,8 +677,6 @@ public class StorageEngine implements IService {
/**
* Add a listener to listen file close events. Notice that this addition
only applies to
* TsFileProcessors created afterwards.
- *
- * @param listener
*/
public void registerCloseFileListener(CloseFileListener listener) {
customCloseFileListeners.add(listener);
@@ -672,7 +692,7 @@ public class StorageEngine implements IService {
}
// When registering a new region, the coordinator needs to register the
corresponding region with
- // the local storageengine before adding the corresponding consensusGroup to
the consensus layer
+ // the local storage before adding the corresponding consensusGroup to the
consensus layer
public DataRegion createDataRegion(DataRegionId regionId, String sg) throws
DataRegionException {
makeSureNoOldRegion(regionId);
AtomicReference<DataRegionException> exceptionAtomicReference = new
AtomicReference<>(null);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
index 6a04147e172..082cde7cfa3 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
@@ -21,7 +21,6 @@ package org.apache.iotdb.db.storageengine.dataregion;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.cluster.NodeStatus;
-import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
import org.apache.iotdb.commons.conf.CommonDescriptor;
import org.apache.iotdb.commons.consensus.DataRegionId;
import org.apache.iotdb.commons.exception.MetadataException;
@@ -157,7 +156,6 @@ import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.Phaser;
import java.util.concurrent.TimeUnit;
@@ -287,7 +285,7 @@ public class DataRegion implements IDataRegionForQuery {
/** whether it's ready from recovery. */
private boolean isReady = false;
- private ExecutorService asyncRecoverExecutorService;
+ private List<Callable<Void>> asyncTsFileResourceRecoverTaskList;
/** close file listeners. */
private List<CloseFileListener> customCloseFileListeners =
Collections.emptyList();
@@ -364,8 +362,7 @@ public class DataRegion implements IDataRegionForQuery {
}
}
} else {
- asyncRecoverExecutorService =
-
IoTDBThreadPoolFactory.newSingleThreadExecutor("AsyncTsFileResourceRecover");
+ asyncTsFileResourceRecoverTaskList = new ArrayList<>();
recover();
}
@@ -390,6 +387,10 @@ public class DataRegion implements IDataRegionForQuery {
return isReady;
}
+ public List<Callable<Void>> getAsyncTsFileResourceRecoverTaskList() {
+ return asyncTsFileResourceRecoverTaskList;
+ }
+
/** this class is used to store recovering context. */
private class DataRegionRecoveryContext {
/** number of files to be recovered. */
@@ -446,7 +447,6 @@ public class DataRegion implements IDataRegionForQuery {
throw new DataRegionException(e);
}
- List<Callable<?>> asyncRecoverTaskList = new ArrayList<>();
try {
// collect candidate TsFiles from sequential and unsequential data
directory
// split by partition so that we can find the last file of each
partition and decide to
@@ -539,26 +539,26 @@ public class DataRegion implements IDataRegionForQuery {
((TreeMap<Long, List<TsFileResource>>)
partitionTmpUnseqTsFiles).lastKey());
}
for (Entry<Long, List<TsFileResource>> partitionFiles :
partitionTmpSeqTsFiles.entrySet()) {
- Callable<?> asyncRecoverTask =
+ Callable<Void> asyncRecoverTask =
recoverFilesInPartition(
partitionFiles.getKey(),
dataRegionRecoveryContext,
partitionFiles.getValue(),
true);
if (asyncRecoverTask != null) {
- asyncRecoverTaskList.add(asyncRecoverTask);
+ asyncTsFileResourceRecoverTaskList.add(asyncRecoverTask);
}
}
for (Entry<Long, List<TsFileResource>> partitionFiles :
partitionTmpUnseqTsFiles.entrySet()) {
- Callable<?> asyncRecoverTask =
+ Callable<Void> asyncRecoverTask =
recoverFilesInPartition(
partitionFiles.getKey(),
dataRegionRecoveryContext,
partitionFiles.getValue(),
false);
if (asyncRecoverTask != null) {
- asyncRecoverTaskList.add(asyncRecoverTask);
+ asyncTsFileResourceRecoverTaskList.add(asyncRecoverTask);
}
}
if (config.isEnableSeparateData()) {
@@ -608,7 +608,7 @@ public class DataRegion implements IDataRegionForQuery {
throw new DataRegionException(e);
}
- if (isReady) {
+ if (asyncTsFileResourceRecoverTaskList.isEmpty()) {
initCompactionSchedule();
}
@@ -642,6 +642,23 @@ public class DataRegion implements IDataRegionForQuery {
}
}
+ protected void upgradeAndUpdateDeviceLastFlushTime(
+ long timePartitionId, List<TsFileResource> resources) {
+ Map<IDeviceID, Long> endTimeMap = new HashMap<>();
+ for (TsFileResource resource : resources) {
+ for (IDeviceID deviceId : resource.getDevices()) {
+ long endTime = resource.getEndTime(deviceId);
+ endTimeMap.put(deviceId, endTime);
+ }
+ }
+ if (config.isEnableSeparateData()) {
+ lastFlushTimeMap.upgradeAndUpdateMultiDeviceFlushedTime(timePartitionId,
endTimeMap);
+ }
+ if (CommonDescriptor.getInstance().getConfig().isLastCacheEnable()) {
+ lastFlushTimeMap.updateMultiDeviceGlobalFlushedTime(endTimeMap);
+ }
+ }
+
public void initCompactionSchedule() {
if (!config.isEnableSeqSpaceCompaction()
&& !config.isEnableUnseqSpaceCompaction()
@@ -842,7 +859,7 @@ public class DataRegion implements IDataRegionForQuery {
}
}
- private Callable<?> recoverFilesInPartition(
+ private Callable<Void> recoverFilesInPartition(
long partitionId,
DataRegionRecoveryContext context,
List<TsFileResource> resourceList,
@@ -862,7 +879,7 @@ public class DataRegion implements IDataRegionForQuery {
}
List<TsFileResource> resourceListForAsyncRecover = new ArrayList<>();
List<TsFileResource> resourceListForSyncRecover = new ArrayList<>();
- Callable<?> asyncRecoverTask = null;
+ Callable<Void> asyncRecoverTask = null;
for (TsFileResource tsFileResource : resourceList) {
if (fileTimeIndexMap.containsKey(tsFileResource.getTsFileID())) {
tsFileResource.setTimeIndex(fileTimeIndexMap.get(tsFileResource.getTsFileID()));
@@ -892,24 +909,6 @@ public class DataRegion implements IDataRegionForQuery {
DataRegionRecoveryContext context,
List<TsFileResource> resourceList,
boolean isSeq) {
- // TODO: async recover
- Callable<Void> recoverSealedTsFileTask =
- () -> {
- for (TsFileResource tsFileResource : resourceList) {
- try (SealedTsFileRecoverPerformer recoverPerformer =
- new SealedTsFileRecoverPerformer(tsFileResource)) {
- recoverPerformer.recover();
-
tsFileResourceManager.registerSealedTsFileResource(tsFileResource);
- } catch (Throwable e) {
- logger.error(
- "Fail to recover sealed TsFile {}, skip it.",
tsFileResource.getTsFilePath(), e);
- } finally {
- // update recovery context
- context.incrementRecoveredFilesNum();
- }
- }
- return null;
- };
if (config.isEnableSeparateData()) {
if (!lastFlushTimeMap.checkAndCreateFlushedTimePartition(partitionId,
false)) {
TimePartitionManager.getInstance()
@@ -932,7 +931,27 @@ public class DataRegion implements IDataRegionForQuery {
lastFlushTimeMap.getMemSize(partitionId),
false);
}
- return recoverSealedTsFileTask;
+ return () -> {
+ for (TsFileResource tsFileResource : resourceList) {
+ try (SealedTsFileRecoverPerformer recoverPerformer =
+ new SealedTsFileRecoverPerformer(tsFileResource)) {
+ recoverPerformer.recover();
+ tsFileResourceManager.registerSealedTsFileResource(tsFileResource);
+ } catch (Throwable e) {
+ logger.error(
+ "Fail to recover sealed TsFile {}, skip it.",
tsFileResource.getTsFilePath(), e);
+ } finally {
+ // update recovery context
+ context.incrementRecoveredFilesNum();
+ }
+ }
+ // TODO: After recover, replace partition last flush time with device
last flush time
+ if (config.isEnableSeparateData()) {
+ upgradeAndUpdateDeviceLastFlushTime(partitionId, resourceList);
+ }
+
+ return null;
+ };
}
private void syncRecoverFilesInPartition(
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/HashLastFlushTimeMap.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/HashLastFlushTimeMap.java
index 9a827c87440..439e255115d 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/HashLastFlushTimeMap.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/HashLastFlushTimeMap.java
@@ -59,22 +59,7 @@ public class HashLastFlushTimeMap implements
ILastFlushTimeMap {
/** record memory cost of map for each partitionId */
private final Map<Long, Long> memCostForEachPartition = new
ConcurrentHashMap<>();
- // For load
- @Override
- public void updateOneDeviceFlushedTime(long timePartitionId, IDeviceID
deviceId, long time) {
- ILastFlushTime flushTimeMapForPartition =
- partitionLatestFlushedTime.computeIfAbsent(
- timePartitionId, id -> new DeviceLastFlushTime());
- long lastFlushTime = flushTimeMapForPartition.getLastFlushTime(deviceId);
- if (lastFlushTime == Long.MIN_VALUE) {
- long memCost = HASHMAP_NODE_BASIC_SIZE + deviceId.ramBytesUsed();
- memCostForEachPartition.compute(
- timePartitionId, (k1, v1) -> v1 == null ? memCost : v1 + memCost);
- }
- flushTimeMapForPartition.updateLastFlushTime(deviceId, time);
- }
-
- // For recover
+ // For recover and load
@Override
public void updateMultiDeviceFlushedTime(
long timePartitionId, Map<IDeviceID, Long> flushedTimeMap) {
@@ -82,7 +67,7 @@ public class HashLastFlushTimeMap implements
ILastFlushTimeMap {
partitionLatestFlushedTime.computeIfAbsent(
timePartitionId, id -> new DeviceLastFlushTime());
- long memIncr = 0;
+ long memIncr = 0L;
for (Map.Entry<IDeviceID, Long> entry : flushedTimeMap.entrySet()) {
if (flushTimeMapForPartition.getLastFlushTime(entry.getKey()) ==
Long.MIN_VALUE) {
memIncr += HASHMAP_NODE_BASIC_SIZE + entry.getKey().ramBytesUsed();
@@ -94,6 +79,41 @@ public class HashLastFlushTimeMap implements
ILastFlushTimeMap {
timePartitionId, (k1, v1) -> v1 == null ? finalMemIncr : v1 +
finalMemIncr);
}
+ // For recover only
+ @Override
+ public void upgradeAndUpdateMultiDeviceFlushedTime(
+ long timePartitionId, Map<IDeviceID, Long> flushedTimeMap) {
+ ILastFlushTime flushTimeMapForPartition =
+ partitionLatestFlushedTime.computeIfAbsent(
+ timePartitionId, id -> new DeviceLastFlushTime());
+ // upgrade
+ if (flushTimeMapForPartition instanceof PartitionLastFlushTime) {
+ long maxFlushTime = flushTimeMapForPartition.getLastFlushTime(null);
+ ILastFlushTime newDeviceLastFlushTime = new DeviceLastFlushTime();
+ long memIncr = 0;
+ for (Map.Entry<IDeviceID, Long> entry : flushedTimeMap.entrySet()) {
+ memIncr += HASHMAP_NODE_BASIC_SIZE + entry.getKey().ramBytesUsed();
+ newDeviceLastFlushTime.updateLastFlushTime(entry.getKey(),
entry.getValue());
+ maxFlushTime = Math.max(maxFlushTime, entry.getValue());
+ }
+ long finalMemIncr = memIncr;
+ memCostForEachPartition.compute(
+ timePartitionId, (k1, v1) -> v1 == null ? finalMemIncr : v1 +
finalMemIncr);
+ } else {
+ // should not go here
+ long memIncr = 0;
+ for (Map.Entry<IDeviceID, Long> entry : flushedTimeMap.entrySet()) {
+ if (flushTimeMapForPartition.getLastFlushTime(entry.getKey()) ==
Long.MIN_VALUE) {
+ memIncr += HASHMAP_NODE_BASIC_SIZE + entry.getKey().ramBytesUsed();
+ }
+ flushTimeMapForPartition.updateLastFlushTime(entry.getKey(),
entry.getValue());
+ }
+ long finalMemIncr = memIncr;
+ memCostForEachPartition.compute(
+ timePartitionId, (k1, v1) -> v1 == null ? finalMemIncr : v1 +
finalMemIncr);
+ }
+ }
+
// For recover
@Override
public void updatePartitionFlushedTime(long timePartitionId, long
maxFlushedTime) {
@@ -108,12 +128,6 @@ public class HashLastFlushTimeMap implements
ILastFlushTimeMap {
timePartitionId, (k1, v1) -> v1 == null ? memIncr : v1 + memIncr);
}
- @Override
- public void updateOneDeviceGlobalFlushedTime(IDeviceID path, long time) {
- globalLatestFlushedTimeForEachDevice.compute(
- path, (k, v) -> v == null ? time : Math.max(v, time));
- }
-
@Override
public void updateMultiDeviceGlobalFlushedTime(Map<IDeviceID, Long>
globalFlushedTimeMap) {
for (Map.Entry<IDeviceID, Long> entry : globalFlushedTimeMap.entrySet()) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/ILastFlushTimeMap.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/ILastFlushTimeMap.java
index 69c8345dcbf..7bdd141bf6b 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/ILastFlushTimeMap.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/ILastFlushTimeMap.java
@@ -27,18 +27,15 @@ import java.util.Map;
public interface ILastFlushTimeMap {
// region update
- /** Update partitionLatestFlushedTime. */
- void updateOneDeviceFlushedTime(long timePartitionId, IDeviceID deviceId,
long time);
-
void updateMultiDeviceFlushedTime(long timePartitionId, Map<IDeviceID, Long>
flushedTimeMap);
void updatePartitionFlushedTime(long timePartitionId, long maxFlushedTime);
- /** Update globalLatestFlushedTimeForEachDevice. */
- void updateOneDeviceGlobalFlushedTime(IDeviceID path, long time);
-
void updateMultiDeviceGlobalFlushedTime(Map<IDeviceID, Long>
globalFlushedTimeMap);
+ void upgradeAndUpdateMultiDeviceFlushedTime(
+ long timePartitionId, Map<IDeviceID, Long> flushedTimeMap);
+
/** Update both partitionLatestFlushedTime and
globalLatestFlushedTimeForEachDevice. */
void updateLatestFlushTime(long partitionId, Map<IDeviceID, Long> updateMap);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/CompactionScheduleTaskWorker.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/CompactionScheduleTaskWorker.java
index e52ac8c09e4..92c5cb43c6a 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/CompactionScheduleTaskWorker.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/CompactionScheduleTaskWorker.java
@@ -53,7 +53,7 @@ public class CompactionScheduleTaskWorker implements
Callable<Void> {
while (true) {
try {
Thread.sleep(IoTDBDescriptor.getInstance().getConfig().getCompactionScheduleIntervalInMs());
- if (!StorageEngine.getInstance().isReadyForReadAndWrite()) {
+ if (!StorageEngine.getInstance().isReady()) {
continue;
}
List<DataRegion> dataRegionListSnapshot = new
ArrayList<>(dataRegionList);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/TTLScheduleTask.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/TTLScheduleTask.java
index 7a2c4b887ca..9c14430d682 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/TTLScheduleTask.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/TTLScheduleTask.java
@@ -53,7 +53,7 @@ public class TTLScheduleTask implements Callable<Void> {
while (true) {
try {
Thread.sleep(ttlCheckInterval);
- if (!StorageEngine.getInstance().isReadyForReadAndWrite()) {
+ if (!StorageEngine.getInstance().isReady()) {
continue;
}
List<DataRegion> dataRegionListSnapshot = new
ArrayList<>(dataRegionList);