This is an automated email from the ASF dual-hosted git repository.

chenyz pushed a commit to branch pbtree_concurrent
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/pbtree_concurrent by this push:
     new 10c8c6d2727 PBTree Flush Control Monitor (#11699)
10c8c6d2727 is described below

commit 10c8c6d272746f213b1ee8a4c94d8db63090fd41
Author: Chen YZ <[email protected]>
AuthorDate: Sun Dec 17 12:23:39 2023 +0800

    PBTree Flush Control Monitor (#11699)
    
    * save
    
    * save
    
    * save
    
    * fix some bugs
    
    * save
    
    * save
    
    * check
    
    * fix some comment
    
    * pre-allocate despite the device is using template
    
    * test ci
    
    ---------
    
    Co-authored-by: BigGreyBear <[email protected]>
---
 .../persistence/schema/ConfigMTreeStore.java       |   7 +
 .../metric/SchemaEngineCachedMetric.java           |  65 +---
 .../schemaengine/metric/SchemaMetricManager.java   |   4 +-
 .../schemaengine/rescon/SchemaResourceManager.java |   6 +-
 .../schemaregion/impl/SchemaRegionPBTreeImpl.java  |  10 +-
 .../schemaregion/mtree/IMTreeStore.java            |   3 +
 .../schemaregion/mtree/impl/mem/MemMTreeStore.java |   9 +-
 .../mtree/impl/pbtree/CachedMTreeStore.java        |  96 +++---
 .../pbtree/ReentrantReadOnlyCachedMTreeStore.java  |   6 +
 .../impl/pbtree/cache/CacheMemoryManager.java      | 355 --------------------
 .../impl/pbtree/cache/ReleaseFlushMonitor.java     | 373 +++++++++++++++++++++
 .../impl/pbtree/flush/PBTreeFlushExecutor.java     |  71 +++-
 .../mtree/impl/pbtree/flush/Scheduler.java         | 261 ++++++++++++++
 .../pbtree/lock/StampedWriterPreferredLock.java    |   2 +-
 .../pbtree/memcontrol/IReleaseFlushStrategy.java   |   3 -
 .../ReleaseFlushStrategyNumBasedImpl.java          |   5 -
 .../ReleaseFlushStrategySizeBasedImpl.java         |  11 +-
 .../pbtree/schemafile/pagemgr/PageManager.java     |  15 +-
 .../schemaregion/mtree/traverser/Traverser.java    |   9 +-
 .../db/metadata/mtree/schemafile/MonitorTest.java  |  85 +++++
 .../schemaRegion/SchemaStatisticsTest.java         |  11 +-
 .../datanode/src/test/resources/logback-test.xml   |   1 +
 .../commons/concurrent/IoTDBThreadPoolFactory.java |  22 ++
 .../iotdb/commons/concurrent/ThreadName.java       |  14 +-
 .../threadpool/WrappedThreadPoolExecutor.java      |  18 +
 25 files changed, 926 insertions(+), 536 deletions(-)

diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/schema/ConfigMTreeStore.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/schema/ConfigMTreeStore.java
index 5f8f45a660e..ce384e87959 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/schema/ConfigMTreeStore.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/schema/ConfigMTreeStore.java
@@ -31,6 +31,7 @@ import 
org.apache.iotdb.db.schemaengine.schemaregion.mtree.IMTreeStore;
 import 
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.mem.mnode.iterator.AbstractTraverserIterator;
 import 
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.mem.mnode.iterator.MNodeIterator;
 import 
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.mem.mnode.iterator.MemoryTraverserIterator;
+import 
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.cache.ReleaseFlushMonitor;
 import org.apache.iotdb.db.schemaengine.template.Template;
 
 import java.io.File;
@@ -143,4 +144,10 @@ public class ConfigMTreeStore implements 
IMTreeStore<IConfigMNode> {
   public boolean createSnapshot(File snapshotDir) {
     throw new UnsupportedOperationException();
   }
+
+  @Override
+  public ReleaseFlushMonitor.RecordNode recordTraverserStatistics() {
+    // do nothing
+    return null;
+  }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/metric/SchemaEngineCachedMetric.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/metric/SchemaEngineCachedMetric.java
index 59c3e417b80..ee14822fd6e 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/metric/SchemaEngineCachedMetric.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/metric/SchemaEngineCachedMetric.java
@@ -19,32 +19,27 @@
 
 package org.apache.iotdb.db.schemaengine.metric;
 
-import org.apache.iotdb.commons.service.metric.MetricService;
 import org.apache.iotdb.commons.service.metric.enums.Metric;
 import org.apache.iotdb.commons.service.metric.enums.Tag;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.schemaengine.rescon.CachedSchemaEngineStatistics;
-import 
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.cache.CacheMemoryManager;
+import 
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.cache.ReleaseFlushMonitor;
 import 
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.memcontrol.ReleaseFlushStrategySizeBasedImpl;
 import org.apache.iotdb.metrics.AbstractMetricService;
 import org.apache.iotdb.metrics.utils.MetricLevel;
 import org.apache.iotdb.metrics.utils.MetricType;
 
-import java.util.concurrent.TimeUnit;
-
 public class SchemaEngineCachedMetric implements ISchemaEngineMetric {
 
   // TODO: rename schema_file to pbtree
   private static final String RELEASE_THRESHOLD = 
"schema_file_release_threshold";
-  private static final String FLUSH_THRESHOLD = "schema_file_flush_threshold";
   private static final String PINNED_NODE_NUM = "schema_file_pinned_num";
   private static final String UNPINNED_NODE_NUM = "schema_file_unpinned_num";
   private static final String PINNED_MEM_SIZE = "schema_file_pinned_mem";
   private static final String UNPINNED_MEM_SIZE = "schema_file_unpinned_mem";
-  private static final String RELEASE_TIMER = "schema_file_release";
-  private static final String FLUSH_TIMER = "schema_file_flush";
-  private static final String RELEASE_THREAD_NUM = 
"schema_file_release_thread_num";
-  private static final String FLUSH_THREAD_NUM = 
"schema_file_flush_thread_num";
+  private static final String RELEASE_TIMER = "schema_file_release"; // TODO: 
implement it
+  private static final String FLUSH_TIMER = "schema_file_flush"; // TODO: 
implement it
+  private static final String RELEASE_FLUSH_THREAD_NUM = 
"schema_file_release_flush_thread_num";
 
   private final CachedSchemaEngineStatistics engineStatistics;
 
@@ -66,14 +61,6 @@ public class SchemaEngineCachedMetric implements 
ISchemaEngineMetric {
         MetricLevel.IMPORTANT,
         Tag.NAME.toString(),
         RELEASE_THRESHOLD);
-    metricService.gauge(
-        (long)
-            
(IoTDBDescriptor.getInstance().getConfig().getAllocateMemoryForSchemaRegion()
-                * ReleaseFlushStrategySizeBasedImpl.FLUSH_THRESHOLD_RATION),
-        Metric.SCHEMA_ENGINE.toString(),
-        MetricLevel.IMPORTANT,
-        Tag.NAME.toString(),
-        FLUSH_THRESHOLD);
     metricService.createAutoGauge(
         Metric.SCHEMA_ENGINE.toString(),
         MetricLevel.IMPORTANT,
@@ -109,17 +96,10 @@ public class SchemaEngineCachedMetric implements 
ISchemaEngineMetric {
     metricService.createAutoGauge(
         Metric.SCHEMA_ENGINE.toString(),
         MetricLevel.IMPORTANT,
-        CacheMemoryManager.getInstance(),
-        CacheMemoryManager::getReleaseThreadNum,
-        Tag.NAME.toString(),
-        RELEASE_THREAD_NUM);
-    metricService.createAutoGauge(
-        Metric.SCHEMA_ENGINE.toString(),
-        MetricLevel.IMPORTANT,
-        CacheMemoryManager.getInstance(),
-        CacheMemoryManager::getFlushThreadNum,
+        ReleaseFlushMonitor.getInstance(),
+        ReleaseFlushMonitor::getActiveWorkerNum,
         Tag.NAME.toString(),
-        FLUSH_THREAD_NUM);
+        RELEASE_FLUSH_THREAD_NUM);
   }
 
   @Override
@@ -127,8 +107,6 @@ public class SchemaEngineCachedMetric implements 
ISchemaEngineMetric {
     schemaEngineMemMetric.unbindFrom(metricService);
     metricService.remove(
         MetricType.GAUGE, Metric.SCHEMA_ENGINE.toString(), 
Tag.NAME.toString(), RELEASE_THRESHOLD);
-    metricService.remove(
-        MetricType.GAUGE, Metric.SCHEMA_ENGINE.toString(), 
Tag.NAME.toString(), FLUSH_THRESHOLD);
     metricService.remove(
         MetricType.AUTO_GAUGE,
         Metric.SCHEMA_ENGINE.toString(),
@@ -157,33 +135,6 @@ public class SchemaEngineCachedMetric implements 
ISchemaEngineMetric {
         MetricType.AUTO_GAUGE,
         Metric.SCHEMA_ENGINE.toString(),
         Tag.NAME.toString(),
-        RELEASE_THREAD_NUM);
-    metricService.remove(
-        MetricType.AUTO_GAUGE,
-        Metric.SCHEMA_ENGINE.toString(),
-        Tag.NAME.toString(),
-        FLUSH_THREAD_NUM);
-  }
-
-  public void recordFlush(long milliseconds) {
-    MetricService.getInstance()
-        .timer(
-            milliseconds,
-            TimeUnit.MILLISECONDS,
-            Metric.SCHEMA_ENGINE.toString(),
-            MetricLevel.IMPORTANT,
-            Tag.NAME.toString(),
-            FLUSH_TIMER);
-  }
-
-  public void recordRelease(long milliseconds) {
-    MetricService.getInstance()
-        .timer(
-            milliseconds,
-            TimeUnit.MILLISECONDS,
-            Metric.SCHEMA_ENGINE.toString(),
-            MetricLevel.IMPORTANT,
-            Tag.NAME.toString(),
-            RELEASE_TIMER);
+        RELEASE_FLUSH_THREAD_NUM);
   }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/metric/SchemaMetricManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/metric/SchemaMetricManager.java
index 32ede440dbd..73e07ad00cb 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/metric/SchemaMetricManager.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/metric/SchemaMetricManager.java
@@ -23,7 +23,7 @@ import org.apache.iotdb.commons.conf.CommonDescriptor;
 import org.apache.iotdb.commons.service.metric.MetricService;
 import org.apache.iotdb.db.schemaengine.rescon.ISchemaEngineStatistics;
 import org.apache.iotdb.db.schemaengine.schemaregion.ISchemaRegion;
-import 
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.cache.CacheMemoryManager;
+import 
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.cache.ReleaseFlushMonitor;
 
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
@@ -40,7 +40,7 @@ public class SchemaMetricManager {
       SchemaEngineCachedMetric schemaEngineCachedMetric =
           new 
SchemaEngineCachedMetric(engineStatistics.getAsCachedSchemaEngineStatistics());
       engineMetric = schemaEngineCachedMetric;
-      
CacheMemoryManager.getInstance().setEngineMetric(schemaEngineCachedMetric);
+      
ReleaseFlushMonitor.getInstance().setEngineMetric(schemaEngineCachedMetric);
     }
     MetricService.getInstance().addMetricSet(engineMetric);
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/rescon/SchemaResourceManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/rescon/SchemaResourceManager.java
index 56bf96deebf..385109ed471 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/rescon/SchemaResourceManager.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/rescon/SchemaResourceManager.java
@@ -21,7 +21,7 @@ package org.apache.iotdb.db.schemaengine.rescon;
 
 import org.apache.iotdb.commons.conf.CommonDescriptor;
 import org.apache.iotdb.db.schemaengine.SchemaEngineMode;
-import 
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.cache.CacheMemoryManager;
+import 
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.cache.ReleaseFlushMonitor;
 
 public class SchemaResourceManager {
 
@@ -46,10 +46,10 @@ public class SchemaResourceManager {
   }
 
   private static void initSchemaFileModeResource(ISchemaEngineStatistics 
engineStatistics) {
-    CacheMemoryManager.getInstance().init(engineStatistics);
+    ReleaseFlushMonitor.getInstance().init(engineStatistics);
   }
 
   private static void clearSchemaFileModeResource() {
-    CacheMemoryManager.getInstance().clear();
+    ReleaseFlushMonitor.getInstance().clear();
   }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/impl/SchemaRegionPBTreeImpl.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/impl/SchemaRegionPBTreeImpl.java
index f9a80332187..f8f4b6f4ce5 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/impl/SchemaRegionPBTreeImpl.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/impl/SchemaRegionPBTreeImpl.java
@@ -58,7 +58,7 @@ import 
org.apache.iotdb.db.schemaengine.schemaregion.logfile.SchemaLogWriter;
 import 
org.apache.iotdb.db.schemaengine.schemaregion.logfile.visitor.SchemaRegionPlanDeserializer;
 import 
org.apache.iotdb.db.schemaengine.schemaregion.logfile.visitor.SchemaRegionPlanSerializer;
 import 
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.MTreeBelowSGCachedImpl;
-import 
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.cache.CacheMemoryManager;
+import 
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.cache.ReleaseFlushMonitor;
 import 
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.mnode.ICachedMNode;
 import org.apache.iotdb.db.schemaengine.schemaregion.read.req.IShowDevicesPlan;
 import org.apache.iotdb.db.schemaengine.schemaregion.read.req.IShowNodesPlan;
@@ -580,7 +580,7 @@ public class SchemaRegionPBTreeImpl implements 
ISchemaRegion {
   @SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity 
warning
   public void createTimeseries(ICreateTimeSeriesPlan plan, long offset) throws 
MetadataException {
     while (!regionStatistics.isAllowToCreateNewSeries()) {
-      CacheMemoryManager.getInstance().waitIfReleasing();
+      ReleaseFlushMonitor.getInstance().waitIfReleasing();
     }
 
     PartialPath path = plan.getPath();
@@ -666,7 +666,7 @@ public class SchemaRegionPBTreeImpl implements 
ISchemaRegion {
   public void createAlignedTimeSeries(ICreateAlignedTimeSeriesPlan plan) 
throws MetadataException {
     int seriesCount = plan.getMeasurements().size();
     while (!regionStatistics.isAllowToCreateNewSeries()) {
-      CacheMemoryManager.getInstance().waitIfReleasing();
+      ReleaseFlushMonitor.getInstance().waitIfReleasing();
     }
 
     try {
@@ -841,7 +841,7 @@ public class SchemaRegionPBTreeImpl implements 
ISchemaRegion {
   @Override
   public void createLogicalView(ICreateLogicalViewPlan plan) throws 
MetadataException {
     while (!regionStatistics.isAllowToCreateNewSeries()) {
-      CacheMemoryManager.getInstance().waitIfReleasing();
+      ReleaseFlushMonitor.getInstance().waitIfReleasing();
     }
     try {
       List<PartialPath> pathList = plan.getViewPathList();
@@ -1232,7 +1232,7 @@ public class SchemaRegionPBTreeImpl implements 
ISchemaRegion {
   public void activateSchemaTemplate(IActivateTemplateInClusterPlan plan, 
Template template)
       throws MetadataException {
     while (!regionStatistics.isAllowToCreateNewSeries()) {
-      CacheMemoryManager.getInstance().waitIfReleasing();
+      ReleaseFlushMonitor.getInstance().waitIfReleasing();
     }
     try {
       ICachedMNode deviceNode = 
getDeviceNodeWithAutoCreate(plan.getActivatePath());
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/IMTreeStore.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/IMTreeStore.java
index 81f8867d80c..67af9efead9 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/IMTreeStore.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/IMTreeStore.java
@@ -24,6 +24,7 @@ import org.apache.iotdb.commons.schema.node.IMNode;
 import org.apache.iotdb.commons.schema.node.role.IDeviceMNode;
 import org.apache.iotdb.commons.schema.node.role.IMeasurementMNode;
 import org.apache.iotdb.commons.schema.node.utils.IMNodeIterator;
+import 
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.cache.ReleaseFlushMonitor;
 import org.apache.iotdb.db.schemaengine.template.Template;
 
 import java.io.File;
@@ -99,4 +100,6 @@ public interface IMTreeStore<N extends IMNode<N>> {
   void clear();
 
   boolean createSnapshot(File snapshotDir);
+
+  ReleaseFlushMonitor.RecordNode recordTraverserStatistics();
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/mem/MemMTreeStore.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/mem/MemMTreeStore.java
index 2eab7c65563..22ee4235125 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/mem/MemMTreeStore.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/mem/MemMTreeStore.java
@@ -34,6 +34,7 @@ import 
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.mem.mnode.iterat
 import 
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.mem.mnode.iterator.MNodeIterator;
 import 
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.mem.mnode.iterator.MemoryTraverserIterator;
 import 
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.mem.snapshot.MemMTreeSnapshotUtil;
+import 
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.cache.ReleaseFlushMonitor;
 import 
org.apache.iotdb.db.schemaengine.schemaregion.mtree.loader.MNodeFactoryLoader;
 import org.apache.iotdb.db.schemaengine.schemaregion.utils.MNodeUtils;
 import org.apache.iotdb.db.schemaengine.template.Template;
@@ -48,7 +49,7 @@ public class MemMTreeStore implements IMTreeStore<IMemMNode> {
 
   private final MemSchemaRegionStatistics regionStatistics;
   private final IMNodeFactory<IMemMNode> nodeFactory =
-      MNodeFactoryLoader.getInstance().getMemMNodeIMNodeFactory();;
+      MNodeFactoryLoader.getInstance().getMemMNodeIMNodeFactory();
 
   private IMemMNode root;
 
@@ -218,6 +219,12 @@ public class MemMTreeStore implements 
IMTreeStore<IMemMNode> {
         regionStatistics);
   }
 
+  @Override
+  public ReleaseFlushMonitor.RecordNode recordTraverserStatistics() {
+    // do nothing
+    return null;
+  }
+
   private void requestMemory(int size) {
     if (regionStatistics != null) {
       regionStatistics.requestMemory(size);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/CachedMTreeStore.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/CachedMTreeStore.java
index 76aafffd435..708173bca5a 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/CachedMTreeStore.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/CachedMTreeStore.java
@@ -31,8 +31,8 @@ import 
org.apache.iotdb.db.schemaengine.rescon.CachedSchemaRegionStatistics;
 import org.apache.iotdb.db.schemaengine.schemaregion.mtree.IMTreeStore;
 import 
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.mem.mnode.estimator.MNodeSizeEstimator;
 import 
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.mem.mnode.iterator.AbstractTraverserIterator;
-import 
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.cache.CacheMemoryManager;
 import 
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.cache.ICacheManager;
+import 
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.cache.ReleaseFlushMonitor;
 import 
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.flush.PBTreeFlushExecutor;
 import 
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.lock.LockManager;
 import 
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.memcontrol.MemManager;
@@ -66,10 +66,12 @@ public class CachedMTreeStore implements 
IMTreeStore<ICachedMNode> {
   private final ICacheManager cacheManager;
   private ISchemaFile file;
   private ICachedMNode root;
+  // TODO: delete it
   private final Runnable flushCallback;
   private final IMNodeFactory<ICachedMNode> nodeFactory =
       MNodeFactoryLoader.getInstance().getCachedMNodeIMNodeFactory();
   private final CachedSchemaRegionStatistics regionStatistics;
+  private final ReleaseFlushMonitor releaseFlushMonitor = 
ReleaseFlushMonitor.getInstance();
   private final LockManager lockManager = new LockManager();
 
   public CachedMTreeStore(
@@ -84,8 +86,7 @@ public class CachedMTreeStore implements 
IMTreeStore<ICachedMNode> {
     this.regionStatistics = regionStatistics;
     this.memManager = new MemManager(regionStatistics);
     this.flushCallback = flushCallback;
-    this.cacheManager =
-        CacheMemoryManager.getInstance().createLRUCacheManager(this, 
memManager, lockManager);
+    this.cacheManager = releaseFlushMonitor.createLRUCacheManager(this, 
memManager, lockManager);
     cacheManager.initRootStatus(root);
     regionStatistics.setCacheManager(cacheManager);
     ensureMemoryStatus();
@@ -108,6 +109,18 @@ public class CachedMTreeStore implements 
IMTreeStore<ICachedMNode> {
     return res;
   }
 
+  public ICacheManager getCacheManager() {
+    return cacheManager;
+  }
+
+  public ISchemaFile getSchemaFile() {
+    return file;
+  }
+
+  public LockManager getLockManager() {
+    return lockManager;
+  }
+
   @Override
   public ICachedMNode getRoot() {
     return root;
@@ -519,7 +532,7 @@ public class CachedMTreeStore implements 
IMTreeStore<ICachedMNode> {
   public void clear() {
     lockManager.globalWriteLock();
     try {
-      CacheMemoryManager.getInstance().clearCachedMTreeStore(this);
+      releaseFlushMonitor.clearCachedMTreeStore(this);
       regionStatistics.setCacheManager(null);
       cacheManager.clear(root);
       root = null;
@@ -559,6 +572,11 @@ public class CachedMTreeStore implements 
IMTreeStore<ICachedMNode> {
         snapshotDir, storageGroup, schemaRegionId, regionStatistics, 
flushCallback);
   }
 
+  @Override
+  public ReleaseFlushMonitor.RecordNode recordTraverserStatistics() {
+    return releaseFlushMonitor.recordTraverserTime(schemaRegionId);
+  }
+
   private CachedMTreeStore(
       File snapshotDir,
       String storageGroup,
@@ -572,15 +590,14 @@ public class CachedMTreeStore implements 
IMTreeStore<ICachedMNode> {
     this.regionStatistics = regionStatistics;
     this.memManager = new MemManager(regionStatistics);
     this.flushCallback = flushCallback;
-    this.cacheManager =
-        CacheMemoryManager.getInstance().createLRUCacheManager(this, 
memManager, lockManager);
+    this.cacheManager = releaseFlushMonitor.createLRUCacheManager(this, 
memManager, lockManager);
     cacheManager.initRootStatus(root);
     regionStatistics.setCacheManager(cacheManager);
     ensureMemoryStatus();
   }
 
   private void ensureMemoryStatus() {
-    CacheMemoryManager.getInstance().ensureMemoryStatus();
+    releaseFlushMonitor.ensureMemoryStatus();
   }
 
   public CachedSchemaRegionStatistics getRegionStatistics() {
@@ -606,43 +623,29 @@ public class CachedMTreeStore implements 
IMTreeStore<ICachedMNode> {
       lockManager.globalReadLock();
     }
     try {
-      boolean hasVolatileNodes = flushVolatileDBNode();
+      PBTreeFlushExecutor flushExecutor;
+      IDatabaseMNode<ICachedMNode> updatedStorageGroupMNode =
+          cacheManager.collectUpdatedStorageGroupMNodes();
+      if (updatedStorageGroupMNode != null) {
+        flushExecutor =
+            new PBTreeFlushExecutor(updatedStorageGroupMNode, cacheManager, 
file, lockManager);
+        flushExecutor.flushDatabase();
+      }
 
       Iterator<ICachedMNode> volatileSubtrees = 
cacheManager.collectVolatileSubtrees();
-      if (volatileSubtrees.hasNext()) {
-        hasVolatileNodes = true;
-
-        long startTime = System.currentTimeMillis();
-
-        ICachedMNode subtreeRoot;
-        PBTreeFlushExecutor flushExecutor;
-        while (volatileSubtrees.hasNext()) {
-          subtreeRoot = volatileSubtrees.next();
-          flushExecutor =
-              new PBTreeFlushExecutor(subtreeRoot, needLock, cacheManager, 
file, lockManager);
-          flushExecutor.flushVolatileNodes();
-        }
-
-        long time = System.currentTimeMillis() - startTime;
-        if (time > 10_000) {
-          LOGGER.info("It takes {}ms to flush MTree in SchemaRegion {}", time, 
schemaRegionId);
-        } else {
-          LOGGER.debug("It takes {}ms to flush MTree in SchemaRegion {}", 
time, schemaRegionId);
-        }
-      }
 
-      if (hasVolatileNodes) {
-        flushCallback.run();
+      long startTime = System.currentTimeMillis();
+      flushExecutor = new PBTreeFlushExecutor(volatileSubtrees, cacheManager, 
file, lockManager);
+      flushExecutor.flushVolatileNodes();
+      long time = System.currentTimeMillis() - startTime;
+      if (time > 10_000) {
+        LOGGER.info("It takes {}ms to flush MTree in SchemaRegion {}", time, 
schemaRegionId);
+      } else {
+        LOGGER.debug("It takes {}ms to flush MTree in SchemaRegion {}", time, 
schemaRegionId);
       }
-
-      ensureMemoryStatus();
-    } catch (MetadataException | IOException e) {
-      LOGGER.warn(
-          "Exception occurred during MTree flush, current SchemaRegionId is 
{}", schemaRegionId, e);
     } catch (Throwable e) {
       LOGGER.error(
           "Error occurred during MTree flush, current SchemaRegionId is {}", 
schemaRegionId, e);
-      e.printStackTrace();
     } finally {
       if (needLock) {
         lockManager.globalReadUnlock();
@@ -650,25 +653,6 @@ public class CachedMTreeStore implements 
IMTreeStore<ICachedMNode> {
     }
   }
 
-  private boolean flushVolatileDBNode() throws IOException {
-    IDatabaseMNode<ICachedMNode> updatedStorageGroupMNode =
-        cacheManager.collectUpdatedStorageGroupMNodes();
-    if (updatedStorageGroupMNode == null) {
-      return false;
-    }
-
-    try {
-      file.updateDatabaseNode(updatedStorageGroupMNode);
-      return true;
-    } catch (IOException e) {
-      LOGGER.warn(
-          "IOException occurred during updating StorageGroupMNode {}",
-          updatedStorageGroupMNode.getFullPath(),
-          e);
-      throw e;
-    }
-  }
-
   /**
    * Since any node R/W operation may change the memory status, thus it should 
be controlled during
    * iterating child nodes.
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/ReentrantReadOnlyCachedMTreeStore.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/ReentrantReadOnlyCachedMTreeStore.java
index 969bcdcfce4..964731bd455 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/ReentrantReadOnlyCachedMTreeStore.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/ReentrantReadOnlyCachedMTreeStore.java
@@ -24,6 +24,7 @@ import org.apache.iotdb.commons.schema.node.role.IDeviceMNode;
 import org.apache.iotdb.commons.schema.node.role.IMeasurementMNode;
 import org.apache.iotdb.commons.schema.node.utils.IMNodeIterator;
 import org.apache.iotdb.db.schemaengine.schemaregion.mtree.IMTreeStore;
+import 
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.cache.ReleaseFlushMonitor;
 import 
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.mnode.ICachedMNode;
 import org.apache.iotdb.db.schemaengine.template.Template;
 
@@ -132,6 +133,11 @@ public class ReentrantReadOnlyCachedMTreeStore implements 
IMTreeStore<ICachedMNo
     throw new UnsupportedOperationException("ReadOnlyReentrantMTreeStore");
   }
 
+  @Override
+  public ReleaseFlushMonitor.RecordNode recordTraverserStatistics() {
+    return store.recordTraverserStatistics();
+  }
+
   public void unlockRead() {
     store.stampedReadUnlock(readLockStamp);
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/cache/CacheMemoryManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/cache/CacheMemoryManager.java
deleted file mode 100644
index d5c6bec1e28..00000000000
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/cache/CacheMemoryManager.java
+++ /dev/null
@@ -1,355 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.cache;
-
-import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
-import org.apache.iotdb.commons.concurrent.ThreadName;
-import 
org.apache.iotdb.commons.concurrent.threadpool.WrappedThreadPoolExecutor;
-import org.apache.iotdb.commons.utils.TestOnly;
-import org.apache.iotdb.db.conf.IoTDBDescriptor;
-import org.apache.iotdb.db.schemaengine.metric.SchemaEngineCachedMetric;
-import org.apache.iotdb.db.schemaengine.rescon.CachedSchemaEngineStatistics;
-import org.apache.iotdb.db.schemaengine.rescon.ISchemaEngineStatistics;
-import 
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.CachedMTreeStore;
-import 
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.lock.LockManager;
-import 
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.memcontrol.IReleaseFlushStrategy;
-import 
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.memcontrol.MemManager;
-import 
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.memcontrol.ReleaseFlushStrategyNumBasedImpl;
-import 
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.memcontrol.ReleaseFlushStrategySizeBasedImpl;
-import org.apache.iotdb.db.utils.concurrent.FiniteSemaphore;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.List;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.ExecutorService;
-
-/**
- * CacheMemoryManager is used to register the CachedMTreeStore and create the 
CacheManager.
- * CacheMemoryManager provides the {@link 
CacheMemoryManager#ensureMemoryStatus} interface, which
- * starts asynchronous threads to free and flush the disk when memory usage 
exceeds a threshold.
- */
-public class CacheMemoryManager {
-
-  private static final Logger logger = 
LoggerFactory.getLogger(CacheMemoryManager.class);
-
-  private final List<CachedMTreeStore> storeList = new 
CopyOnWriteArrayList<>();
-
-  private CachedSchemaEngineStatistics engineStatistics;
-  private SchemaEngineCachedMetric engineMetric;
-
-  private static final int CONCURRENT_NUM = 10;
-
-  private ExecutorService flushTaskProcessor;
-  private ExecutorService flushTaskMonitor;
-  private ExecutorService releaseTaskProcessor;
-  private ExecutorService releaseTaskMonitor;
-
-  private FiniteSemaphore flushSemaphore;
-  private FiniteSemaphore releaseSemaphore;
-
-  private volatile boolean hasFlushTask;
-
-  private volatile boolean hasReleaseTask;
-
-  private IReleaseFlushStrategy releaseFlushStrategy;
-
-  private static final int MAX_WAITING_TIME_WHEN_RELEASING = 3_000;
-  private final Object blockObject = new Object();
-
-  /**
-   * Create and allocate LRUCacheManager to the corresponding CachedMTreeStore.
-   *
-   * @param store CachedMTreeStore
-   * @return LRUCacheManager
-   */
-  public ICacheManager createLRUCacheManager(
-      CachedMTreeStore store, MemManager memManager, LockManager lockManager) {
-    ICacheManager cacheManager = new LRUCacheManager(memManager, lockManager);
-    storeList.add(store);
-    return cacheManager;
-  }
-
-  public void clearCachedMTreeStore(CachedMTreeStore store) {
-    storeList.remove(store);
-  }
-
-  public void init(ISchemaEngineStatistics engineStatistics) {
-    flushSemaphore = new FiniteSemaphore(2, 0);
-    releaseSemaphore = new FiniteSemaphore(2, 0);
-    this.engineStatistics = 
engineStatistics.getAsCachedSchemaEngineStatistics();
-    if 
(IoTDBDescriptor.getInstance().getConfig().getCachedMNodeSizeInPBTreeMode() >= 
0) {
-      releaseFlushStrategy = new 
ReleaseFlushStrategyNumBasedImpl(this.engineStatistics);
-    } else {
-      releaseFlushStrategy = new 
ReleaseFlushStrategySizeBasedImpl(this.engineStatistics);
-    }
-    flushTaskMonitor =
-        
IoTDBThreadPoolFactory.newSingleThreadExecutor(ThreadName.SCHEMA_FLUSH_MONITOR.getName());
-    flushTaskProcessor =
-        IoTDBThreadPoolFactory.newFixedThreadPool(
-            CONCURRENT_NUM, 
ThreadName.SCHEMA_REGION_FLUSH_PROCESSOR.getName());
-    releaseTaskMonitor =
-        
IoTDBThreadPoolFactory.newSingleThreadExecutor(ThreadName.SCHEMA_RELEASE_MONITOR.getName());
-    releaseTaskProcessor =
-        IoTDBThreadPoolFactory.newFixedThreadPool(
-            CONCURRENT_NUM, 
ThreadName.SCHEMA_REGION_RELEASE_PROCESSOR.getName());
-    releaseTaskMonitor.submit(
-        () -> {
-          try {
-            while (!Thread.currentThread().isInterrupted()) {
-              releaseSemaphore.acquire();
-              try {
-                if (isExceedReleaseThreshold()) {
-                  hasReleaseTask = true;
-                  tryExecuteMemoryRelease();
-                }
-              } catch (Throwable throwable) {
-                hasReleaseTask = false;
-                logger.error("Something wrong happened during MTree release.", 
throwable);
-              }
-            }
-          } catch (InterruptedException e) {
-            logger.info("ReleaseTaskMonitor thread is interrupted.");
-            Thread.currentThread().interrupt();
-          }
-        });
-    flushTaskMonitor.submit(
-        () -> {
-          try {
-            while (!Thread.currentThread().isInterrupted()) {
-              flushSemaphore.acquire();
-              try {
-                if (isExceedFlushThreshold()) {
-                  hasFlushTask = true;
-                  tryFlushVolatileNodes();
-                }
-              } catch (Throwable throwable) {
-                hasFlushTask = false;
-                logger.error("Something wrong happened during MTree flush.", 
throwable);
-              }
-            }
-          } catch (InterruptedException e) {
-            logger.info("FlushTaskMonitor thread is interrupted.");
-            Thread.currentThread().interrupt();
-          }
-        });
-  }
-
-  public void setEngineMetric(SchemaEngineCachedMetric engineMetric) {
-    this.engineMetric = engineMetric;
-  }
-
-  public boolean isExceedReleaseThreshold() {
-    return releaseFlushStrategy.isExceedReleaseThreshold();
-  }
-
-  public boolean isExceedFlushThreshold() {
-    return releaseFlushStrategy.isExceedFlushThreshold();
-  }
-
-  /**
-   * Check the current memory usage. If the release threshold is exceeded, 
trigger the task to
-   * perform an internal and external memory swap to release the memory.
-   */
-  public void ensureMemoryStatus() {
-    if (isExceedReleaseThreshold()) {
-      registerReleaseTask();
-    }
-  }
-
-  /**
-   * If there is a ReleaseTask or FlushTask, block the current thread to wait 
up to
-   * MAX_WAITING_TIME_WHEN_RELEASING. The thread will be woken up if the 
ReleaseTask or FlushTask
-   * ends or the wait time exceeds MAX_WAITING_TIME_WHEN_RELEASING.
-   */
-  public void waitIfReleasing() {
-    synchronized (blockObject) {
-      if (hasReleaseTask || hasFlushTask) {
-        try {
-          blockObject.wait(MAX_WAITING_TIME_WHEN_RELEASING);
-        } catch (InterruptedException e) {
-          logger.warn(
-              "Interrupt because the release task and flush task did not 
finish within {} milliseconds.",
-              MAX_WAITING_TIME_WHEN_RELEASING);
-          Thread.currentThread().interrupt();
-        }
-      }
-    }
-  }
-
-  private void registerReleaseTask() {
-    releaseSemaphore.release();
-  }
-
-  /**
-   * Execute cache eviction until the memory status is under safe mode or no 
node could be evicted.
-   * If the memory status is still full, which means the nodes in memory are 
all volatile nodes, new
-   * added or updated, fire flush task.
-   */
-  private void tryExecuteMemoryRelease() {
-    long startTime = System.currentTimeMillis();
-    CompletableFuture.allOf(
-            storeList.stream()
-                .map(
-                    store ->
-                        CompletableFuture.runAsync(
-                            () -> {
-                              executeMemoryRelease(store);
-                            },
-                            releaseTaskProcessor))
-                .toArray(CompletableFuture[]::new))
-        .join();
-    if (engineMetric != null) {
-      engineMetric.recordRelease(System.currentTimeMillis() - startTime);
-    }
-    synchronized (blockObject) {
-      hasReleaseTask = false;
-      if (isExceedFlushThreshold()) {
-        registerFlushTask();
-      } else {
-        blockObject.notifyAll();
-      }
-    }
-  }
-
-  /**
-   * Keep fetching evictable nodes from cacheManager until the memory status 
is under safe mode or
-   * no node could be evicted. Update the memory status after evicting each 
node.
-   */
-  private void executeMemoryRelease(CachedMTreeStore store) {
-    while (isExceedReleaseThreshold()) {
-      // store try to release memory if not exceed release threshold
-      if (store.executeMemoryRelease()) {
-        // if store can not release memory, break
-        break;
-      }
-    }
-  }
-
-  private void registerFlushTask() {
-    flushSemaphore.release();
-  }
-
-  /** Sync all volatile nodes to schemaFile and execute memory release after 
flush. */
-  private void tryFlushVolatileNodes() {
-    long startTime = System.currentTimeMillis();
-    CompletableFuture.allOf(
-            storeList.stream()
-                .map(
-                    store ->
-                        CompletableFuture.runAsync(
-                            () -> {
-                              store.flushVolatileNodes(true);
-                            },
-                            flushTaskProcessor))
-                .toArray(CompletableFuture[]::new))
-        .join();
-    if (engineMetric != null) {
-      engineMetric.recordFlush(System.currentTimeMillis() - startTime);
-    }
-    synchronized (blockObject) {
-      hasFlushTask = false;
-      blockObject.notifyAll();
-    }
-  }
-
-  public void clear() {
-    if (releaseTaskMonitor != null) {
-      releaseTaskMonitor.shutdownNow();
-      while (true) {
-        if (releaseTaskMonitor.isTerminated()) break;
-      }
-      releaseTaskMonitor = null;
-    }
-    if (flushTaskMonitor != null) {
-      flushTaskMonitor.shutdownNow();
-      while (true) {
-        if (flushTaskMonitor.isTerminated()) break;
-      }
-      releaseTaskMonitor = null;
-    }
-    if (releaseTaskProcessor != null) {
-      while (true) {
-        if (!hasReleaseTask) break;
-      }
-      releaseTaskProcessor.shutdown();
-      while (true) {
-        if (releaseTaskProcessor.isTerminated()) break;
-      }
-      releaseTaskProcessor = null;
-    }
-    // the release task may submit flush task, thus must be shut down and 
clear first
-    if (flushTaskProcessor != null) {
-      while (true) {
-        if (!hasFlushTask) break;
-      }
-      flushTaskProcessor.shutdown();
-      while (true) {
-        if (flushTaskProcessor.isTerminated()) break;
-      }
-      flushTaskProcessor = null;
-    }
-    storeList.clear();
-    releaseFlushStrategy = null;
-    engineStatistics = null;
-    releaseSemaphore = null;
-    flushSemaphore = null;
-    engineMetric = null;
-  }
-
-  public int getReleaseThreadNum() {
-    return ((WrappedThreadPoolExecutor) releaseTaskProcessor).getActiveCount();
-  }
-
-  public int getFlushThreadNum() {
-    return ((WrappedThreadPoolExecutor) flushTaskProcessor).getActiveCount();
-  }
-
-  private CacheMemoryManager() {}
-
-  private static class GlobalCacheManagerHolder {
-    private static final CacheMemoryManager INSTANCE = new 
CacheMemoryManager();
-
-    private GlobalCacheManagerHolder() {}
-  }
-
-  public static CacheMemoryManager getInstance() {
-    return CacheMemoryManager.GlobalCacheManagerHolder.INSTANCE;
-  }
-
-  @TestOnly
-  public void forceFlushAndRelease() {
-    releaseFlushStrategy =
-        new IReleaseFlushStrategy() {
-          @Override
-          public boolean isExceedReleaseThreshold() {
-            return true;
-          }
-
-          @Override
-          public boolean isExceedFlushThreshold() {
-            return true;
-          }
-        };
-    registerFlushTask();
-    registerReleaseTask();
-  }
-}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/cache/ReleaseFlushMonitor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/cache/ReleaseFlushMonitor.java
new file mode 100644
index 00000000000..5837713e42a
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/cache/ReleaseFlushMonitor.java
@@ -0,0 +1,373 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.cache;
+
+import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
+import org.apache.iotdb.commons.concurrent.ThreadName;
+import org.apache.iotdb.commons.concurrent.threadpool.ScheduledExecutorUtil;
+import org.apache.iotdb.commons.utils.TestOnly;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.schemaengine.metric.SchemaEngineCachedMetric;
+import org.apache.iotdb.db.schemaengine.rescon.CachedSchemaEngineStatistics;
+import org.apache.iotdb.db.schemaengine.rescon.ISchemaEngineStatistics;
+import 
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.CachedMTreeStore;
+import 
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.flush.Scheduler;
+import 
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.lock.LockManager;
+import 
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.memcontrol.IReleaseFlushStrategy;
+import 
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.memcontrol.MemManager;
+import 
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.memcontrol.ReleaseFlushStrategyNumBasedImpl;
+import 
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.memcontrol.ReleaseFlushStrategySizeBasedImpl;
+import org.apache.iotdb.db.utils.concurrent.FiniteSemaphore;
+import org.apache.iotdb.tsfile.utils.Pair;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.concurrent.NotThreadSafe;
+
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArraySet;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+/**
+ * CacheMemoryManager is used to register the CachedMTreeStore and create the 
CacheManager.
+ * CacheMemoryManager provides the {@link 
ReleaseFlushMonitor#ensureMemoryStatus} interface, which
+ * starts asynchronous threads to free and flush the disk when memory usage 
exceeds a threshold.
+ */
+public class ReleaseFlushMonitor {
+  private static final Logger logger = 
LoggerFactory.getLogger(ReleaseFlushMonitor.class);
+
+  /** configuration */
+  private static final double FREE_FLUSH_PROPORTION = 0.2;
+
+  private static final int MONITOR_INETRVAL_MILLISECONDS = 5000;
+  private static final int MAX_WAITING_TIME_WHEN_RELEASING = 3_000;
+
+  /** data structure */
+  private final Map<Integer, RecordList> regionToTraverserTime = new 
ConcurrentHashMap<>();
+
+  private final Map<Integer, CachedMTreeStore> regionToStoreMap = new 
ConcurrentHashMap<>();
+  private final Set<Integer> flushingRegionSet = new CopyOnWriteArraySet<>();
+  private CachedSchemaEngineStatistics engineStatistics;
+  private SchemaEngineCachedMetric engineMetric;
+  private IReleaseFlushStrategy releaseFlushStrategy;
+
+  /** thread and lock */
+  private final Object blockObject = new Object();
+
+  private ScheduledExecutorService flushMonitor;
+  private ExecutorService releaseMonitor;
+  private FiniteSemaphore releaseSemaphore;
+  private Scheduler scheduler;
+
+  /**
+   * Create and allocate LRUCacheManager to the corresponding CachedMTreeStore.
+   *
+   * @param store CachedMTreeStore
+   * @return LRUCacheManager
+   */
+  public ICacheManager createLRUCacheManager(
+      CachedMTreeStore store, MemManager memManager, LockManager lockManager) {
+    ICacheManager cacheManager = new LRUCacheManager(memManager, lockManager);
+    regionToStoreMap.put(store.getRegionStatistics().getSchemaRegionId(), 
store);
+    regionToTraverserTime.put(store.getRegionStatistics().getSchemaRegionId(), 
new RecordList());
+    return cacheManager;
+  }
+
+  public void clearCachedMTreeStore(CachedMTreeStore store) {
+    regionToStoreMap.remove(store.getRegionStatistics().getSchemaRegionId());
+  }
+
+  public void init(ISchemaEngineStatistics engineStatistics) {
+    releaseSemaphore = new FiniteSemaphore(2, 0);
+    this.engineStatistics = 
engineStatistics.getAsCachedSchemaEngineStatistics();
+    if 
(IoTDBDescriptor.getInstance().getConfig().getCachedMNodeSizeInPBTreeMode() >= 
0) {
+      releaseFlushStrategy = new 
ReleaseFlushStrategyNumBasedImpl(this.engineStatistics);
+    } else {
+      releaseFlushStrategy = new 
ReleaseFlushStrategySizeBasedImpl(this.engineStatistics);
+    }
+    scheduler = new Scheduler(regionToStoreMap, flushingRegionSet, 
releaseFlushStrategy);
+    releaseMonitor =
+        
IoTDBThreadPoolFactory.newSingleThreadExecutor(ThreadName.PBTREE_RELEASE_MONITOR.getName());
+    flushMonitor =
+        IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor(
+            ThreadName.PBTREE_FLUSH_MONITOR.getName());
+    releaseMonitor.submit(
+        () -> {
+          try {
+            while (!Thread.currentThread().isInterrupted()) {
+              releaseSemaphore.acquire();
+              // 1. first, it will try to release node cache
+              if (releaseFlushStrategy.isExceedReleaseThreshold()) {
+                scheduler.scheduleRelease(false);
+                // 2. if it still exceeds release threshold, it will try to 
flush node buffer, then
+                // release node cache again
+                if (releaseFlushStrategy.isExceedReleaseThreshold()) {
+                  scheduler.forceFlushAll();
+                  regionToTraverserTime.values().forEach(RecordList::clear);
+                  scheduler.scheduleRelease(false);
+                }
+                synchronized (blockObject) {
+                  // invoke the notifyAll() method to wake up the thread 
waiting for the release
+                  blockObject.notifyAll();
+                }
+              }
+            }
+          } catch (InterruptedException e) {
+            logger.info("ReleaseTaskMonitor thread is interrupted.");
+            Thread.currentThread().interrupt();
+          }
+        });
+    ScheduledExecutorUtil.safelyScheduleAtFixedRate(
+        flushMonitor,
+        () -> {
+          if (releaseFlushStrategy.isExceedReleaseThreshold()) {
+            releaseSemaphore.release();
+          } else {
+            
scheduler.scheduleFlush(getRegionsToFlush(System.currentTimeMillis()));
+          }
+        },
+        MONITOR_INETRVAL_MILLISECONDS,
+        MONITOR_INETRVAL_MILLISECONDS,
+        TimeUnit.MILLISECONDS);
+  }
+
+  public void setEngineMetric(SchemaEngineCachedMetric engineMetric) {
+    this.engineMetric = engineMetric;
+  }
+
+  /**
+   * Check the current memory usage. If the release threshold is exceeded, 
trigger the task to
+   * perform an internal and external memory swap to release the memory.
+   */
+  public void ensureMemoryStatus() {
+    if (releaseFlushStrategy.isExceedReleaseThreshold()) {
+      releaseSemaphore.release();
+    }
+  }
+
+  /**
+   * If there is a ReleaseTask or FlushTask, block the current thread to wait 
up to
+   * MAX_WAITING_TIME_WHEN_RELEASING. The thread will be woken up if the 
ReleaseTask or FlushTask
+   * ends or the wait time exceeds MAX_WAITING_TIME_WHEN_RELEASING.
+   */
+  public void waitIfReleasing() {
+    synchronized (blockObject) {
+      try {
+        blockObject.wait(MAX_WAITING_TIME_WHEN_RELEASING);
+      } catch (InterruptedException e) {
+        logger.warn(
+            "Interrupt because the release task and flush task did not finish 
within {} milliseconds.",
+            MAX_WAITING_TIME_WHEN_RELEASING);
+        Thread.currentThread().interrupt();
+      }
+    }
+  }
+
+  public RecordNode recordTraverserTime(int regionId) {
+    return regionToTraverserTime.get(regionId).createAndAddToTail();
+  }
+
+  @TestOnly
+  public void initRecordList(int regionId) {
+    regionToTraverserTime.computeIfAbsent(regionId, k -> new RecordList());
+  }
+
+  public List<Integer> getRegionsToFlush(long windowsEndTime) {
+    long windowsStartTime = windowsEndTime - MONITOR_INETRVAL_MILLISECONDS;
+    List<Pair<Integer, Long>> regionAndFreeTimeList = new ArrayList<>();
+    for (Map.Entry<Integer, RecordList> entry : 
regionToTraverserTime.entrySet()) {
+      int regionId = entry.getKey();
+      long traverserEndTime = windowsStartTime;
+      long traverserFreeTime = 0;
+      RecordList recordList = entry.getValue();
+      Iterator<RecordNode> iterator = recordList.iterator();
+      while (iterator.hasNext()) {
+        RecordNode recordNode = iterator.next();
+        if (recordNode.startTime > windowsEndTime) {
+          break;
+        }
+        if (recordNode.startTime > traverserEndTime) {
+          traverserFreeTime += (recordNode.startTime - traverserEndTime);
+          traverserEndTime = recordNode.endTime;
+        } else if (recordNode.endTime > traverserEndTime) {
+          traverserEndTime = recordNode.endTime;
+        }
+        if (recordNode.endTime < windowsStartTime) {
+          iterator.remove();
+        } else if (recordNode.endTime >= windowsEndTime) {
+          break;
+        }
+      }
+      if (traverserEndTime < windowsEndTime) {
+        traverserFreeTime += (windowsEndTime - traverserEndTime);
+      }
+      if (traverserFreeTime > FREE_FLUSH_PROPORTION * 
MONITOR_INETRVAL_MILLISECONDS) {
+        regionAndFreeTimeList.add(new Pair<>(regionId, traverserFreeTime));
+      }
+    }
+    regionAndFreeTimeList.sort(Comparator.comparing((Pair<Integer, Long> o) -> 
o.right).reversed());
+    return 
regionAndFreeTimeList.stream().map(Pair::getLeft).collect(Collectors.toList());
+  }
+
+  @TestOnly
+  public void forceFlushAndRelease() {
+    scheduler.forceFlushAll();
+    scheduler.scheduleRelease(true);
+  }
+
+  public void clear() {
+    if (releaseMonitor != null) {
+      releaseMonitor.shutdownNow();
+      while (true) {
+        if (releaseMonitor.isTerminated()) break;
+      }
+      releaseMonitor = null;
+    }
+    if (flushMonitor != null) {
+      flushMonitor.shutdownNow();
+      while (true) {
+        if (flushMonitor.isTerminated()) break;
+      }
+      flushMonitor = null;
+    }
+    if (scheduler != null) {
+      scheduler.clear();
+      while (true) {
+        if (scheduler.isTerminated()) break;
+      }
+      scheduler = null;
+    }
+    regionToStoreMap.clear();
+    flushingRegionSet.clear();
+    regionToTraverserTime.clear();
+    releaseFlushStrategy = null;
+    engineStatistics = null;
+    releaseSemaphore = null;
+    engineMetric = null;
+  }
+
+  public int getActiveWorkerNum() {
+    return scheduler.getActiveWorkerNum();
+  }
+
+  private ReleaseFlushMonitor() {}
+
+  private static class ReleaseFlushMonitorHolder {
+    private static final ReleaseFlushMonitor INSTANCE = new 
ReleaseFlushMonitor();
+
+    private ReleaseFlushMonitorHolder() {}
+  }
+
+  public static ReleaseFlushMonitor getInstance() {
+    return ReleaseFlushMonitor.ReleaseFlushMonitorHolder.INSTANCE;
+  }
+
+  @NotThreadSafe
+  private static class RecordList {
+    // The start time of RecordNode is incremental from head to tail
+    private final RecordNode head = new RecordNode();
+    private final RecordNode tail = new RecordNode();
+
+    private RecordList() {
+      head.next = tail;
+      tail.prev = head;
+    }
+
+    private synchronized RecordNode createAndAddToTail() {
+      RecordNode recordNode = new RecordNode();
+      recordNode.prev = tail.prev;
+      recordNode.next = tail;
+      tail.prev.next = recordNode;
+      tail.prev = recordNode;
+      return recordNode;
+    }
+
+    private synchronized void remove(RecordNode recordNode) {
+      recordNode.prev.next = recordNode.next;
+      recordNode.next.prev = recordNode.prev;
+      recordNode.prev = null;
+      recordNode.next = null;
+    }
+
+    private synchronized void clear() {
+      head.next = tail;
+      tail.prev = head;
+    }
+
+    private Iterator<RecordNode> iterator() {
+      return new Iterator<RecordNode>() {
+        private RecordNode next = null;
+        private RecordNode cur = head;
+
+        @Override
+        public boolean hasNext() {
+          if (next == null && cur.next != tail) {
+            next = cur.next;
+          }
+          return next != null;
+        }
+
+        @Override
+        public RecordNode next() {
+          if (!hasNext()) {
+            throw new NoSuchElementException();
+          }
+          cur = next;
+          next = null;
+          return cur;
+        }
+
+        @Override
+        public void remove() {
+          if (next == null && cur.next != tail) {
+            next = cur.next;
+          }
+          RecordList.this.remove(cur);
+        }
+      };
+    }
+  }
+
+  public static class RecordNode {
+    private RecordNode prev = null;
+    private RecordNode next = null;
+    private Long startTime = System.currentTimeMillis();
+    private Long endTime = Long.MAX_VALUE;
+
+    @TestOnly
+    public void setStartTime(Long startTime) {
+      this.startTime = startTime;
+    }
+
+    public void setEndTime(Long endTime) {
+      this.endTime = endTime;
+    }
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/flush/PBTreeFlushExecutor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/flush/PBTreeFlushExecutor.java
index 614c28087fa..b1b59b630c7 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/flush/PBTreeFlushExecutor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/flush/PBTreeFlushExecutor.java
@@ -20,6 +20,7 @@
 package org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.flush;
 
 import org.apache.iotdb.commons.exception.MetadataException;
+import org.apache.iotdb.commons.schema.node.role.IDatabaseMNode;
 import 
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.cache.ICacheManager;
 import 
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.lock.LockManager;
 import 
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.mnode.ICachedMNode;
@@ -31,6 +32,7 @@ import org.slf4j.LoggerFactory;
 import java.io.IOException;
 import java.util.ArrayDeque;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.Deque;
 import java.util.Iterator;
 import java.util.List;
@@ -39,43 +41,85 @@ public class PBTreeFlushExecutor {
 
   private static final Logger logger = 
LoggerFactory.getLogger(PBTreeFlushExecutor.class);
 
-  private final ICachedMNode subtreeRoot;
+  private final Iterator<ICachedMNode> subtreeRoots;
 
   private final ICacheManager cacheManager;
-
   private final ISchemaFile file;
-
   private final LockManager lockManager;
 
   public PBTreeFlushExecutor(
-      ICachedMNode subtreeRoot,
-      boolean needLock,
+      Iterator<ICachedMNode> subtreeRoots,
+      ICacheManager cacheManager,
+      ISchemaFile file,
+      LockManager lockManager) {
+    this.subtreeRoots = subtreeRoots;
+    this.cacheManager = cacheManager;
+    this.file = file;
+    this.lockManager = lockManager;
+  }
+
+  public PBTreeFlushExecutor(
+      IDatabaseMNode<ICachedMNode> databaseMNode,
       ICacheManager cacheManager,
       ISchemaFile file,
       LockManager lockManager) {
-    this.subtreeRoot = subtreeRoot;
+    this.subtreeRoots = 
Collections.singletonList(databaseMNode.getAsMNode()).iterator();
     this.cacheManager = cacheManager;
     this.file = file;
     this.lockManager = lockManager;
   }
 
-  public void flushVolatileNodes() throws MetadataException, IOException {
+  public void flushVolatileNodes() throws MetadataException {
+    List<Exception> exceptions = new ArrayList<>();
+    while (subtreeRoots.hasNext()) {
+      try {
+        processFlushNonDatabase(subtreeRoots.next());
+      } catch (Exception e) {
+        exceptions.add(e);
+      }
+    }
+    if (!exceptions.isEmpty()) {
+      throw new MetadataException(
+          exceptions.stream().map(Exception::getMessage).reduce("", (a, b) -> 
a + ", " + b));
+    }
+  }
+
+  public void flushDatabase() throws IOException {
+    while (subtreeRoots.hasNext()) {
+      ICachedMNode subtreeRoot = subtreeRoots.next();
+      processFlushDatabase(subtreeRoot.getAsDatabaseMNode());
+    }
+  }
+
+  private void processFlushDatabase(IDatabaseMNode<ICachedMNode> 
updatedStorageGroupMNode)
+      throws IOException {
+    try {
+      file.updateDatabaseNode(updatedStorageGroupMNode);
+    } catch (IOException e) {
+      logger.warn(
+          "IOException occurred during updating StorageGroupMNode {}",
+          updatedStorageGroupMNode.getFullPath(),
+          e);
+      throw e;
+    }
+  }
+
+  private void processFlushNonDatabase(ICachedMNode subtreeRoot)
+      throws MetadataException, IOException {
     Iterator<ICachedMNode> volatileSubtreeIterator;
     List<ICachedMNode> collectedVolatileSubtrees;
     try {
-      file.writeMNode(this.subtreeRoot);
+      file.writeMNode(subtreeRoot);
       collectedVolatileSubtrees = new ArrayList<>();
       volatileSubtreeIterator =
-          
cacheManager.updateCacheStatusAndRetrieveSubtreeAfterPersist(this.subtreeRoot);
+          
cacheManager.updateCacheStatusAndRetrieveSubtreeAfterPersist(subtreeRoot);
       while (volatileSubtreeIterator.hasNext()) {
         collectedVolatileSubtrees.add(volatileSubtreeIterator.next());
       }
     } catch (MetadataException | IOException e) {
       logger.warn(
-          "Error occurred during MTree flush, current node is {}",
-          this.subtreeRoot.getFullPath(),
-          e);
-      cacheManager.updateCacheStatusAfterFlushFailure(this.subtreeRoot);
+          "Error occurred during MTree flush, current node is {}", 
subtreeRoot.getFullPath(), e);
+      cacheManager.updateCacheStatusAfterFlushFailure(subtreeRoot);
       throw e;
     } finally {
       lockManager.writeUnlock(subtreeRoot);
@@ -85,7 +129,6 @@ public class PBTreeFlushExecutor {
     volatileSubtreeStack.push(collectedVolatileSubtrees.iterator());
 
     Iterator<ICachedMNode> subtreeIterator;
-    ICachedMNode subtreeRoot;
     while (!volatileSubtreeStack.isEmpty()) {
       subtreeIterator = volatileSubtreeStack.peek();
       if (!subtreeIterator.hasNext()) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/flush/Scheduler.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/flush/Scheduler.java
new file mode 100644
index 00000000000..0ef7a95d3c3
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/flush/Scheduler.java
@@ -0,0 +1,261 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.flush;
+
+import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
+import org.apache.iotdb.commons.concurrent.ThreadName;
+import 
org.apache.iotdb.commons.concurrent.threadpool.WrappedThreadPoolExecutor;
+import org.apache.iotdb.commons.exception.MetadataException;
+import org.apache.iotdb.commons.schema.node.role.IDatabaseMNode;
+import 
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.CachedMTreeStore;
+import 
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.cache.ICacheManager;
+import 
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.lock.LockManager;
+import 
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.memcontrol.IReleaseFlushStrategy;
+import 
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.mnode.ICachedMNode;
+import 
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.schemafile.ISchemaFile;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class Scheduler {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(Scheduler.class);
+
+  /** configuration */
+  private int BATCH_FLUSH_SUBTREE = 50;
+
+  private int FLUSH_WORKER_NUM = 10;
+
+  /** data structure */
+  private final Map<Integer, CachedMTreeStore> regionToStore;
+  // flushingRegionSet is used to avoid flush the same region concurrently, 
update will be
+  // guaranteed by synchronized
+  private final Set<Integer> flushingRegionSet;
+
+  private final ExecutorService workerPool;
+  private final IReleaseFlushStrategy releaseFlushStrategy;
+
+  public Scheduler(
+      Map<Integer, CachedMTreeStore> regionToStore,
+      Set<Integer> flushingRegionSet,
+      IReleaseFlushStrategy releaseFlushStrategy) {
+    this.regionToStore = regionToStore;
+    // When the thread pool is unable to handle a new task, it simply discards 
the task without
+    // doing anything about it.
+    this.workerPool =
+        IoTDBThreadPoolFactory.newFixedThreadPool(
+            FLUSH_WORKER_NUM,
+            ThreadName.PBTREE_FLUSH_PROCESSOR.getName(),
+            new ThreadPoolExecutor.DiscardPolicy());
+    this.flushingRegionSet = flushingRegionSet;
+    this.releaseFlushStrategy = releaseFlushStrategy;
+  }
+
+  /**
+   * Force flush all volatile subtrees and updated database MNodes to disk. 
After flushing, the
+   * MNodes will be placed into node cache. This method will return 
synchronously after all stores
+   * are successfully flushed.
+   */
+  public synchronized void forceFlushAll() {
+    List<Map.Entry<Integer, CachedMTreeStore>> flushEngineList = new 
ArrayList<>();
+    for (Map.Entry<Integer, CachedMTreeStore> entry : 
regionToStore.entrySet()) {
+      if (flushingRegionSet.contains(entry.getKey())) {
+        continue;
+      }
+      flushingRegionSet.add(entry.getKey());
+      flushEngineList.add(entry);
+    }
+    CompletableFuture.allOf(
+            flushEngineList.stream()
+                .map(
+                    entry ->
+                        CompletableFuture.runAsync(
+                            () -> {
+                              CachedMTreeStore store = entry.getValue();
+                              int regionId = entry.getKey();
+                              ICacheManager cacheManager = 
store.getCacheManager();
+                              ISchemaFile file = store.getSchemaFile();
+                              LockManager lockManager = store.getLockManager();
+                              long startTime = System.currentTimeMillis();
+                              PBTreeFlushExecutor flushExecutor;
+                              IDatabaseMNode<ICachedMNode> dbNode =
+                                  
cacheManager.collectUpdatedStorageGroupMNodes();
+                              if (dbNode != null) {
+                                flushExecutor =
+                                    new PBTreeFlushExecutor(
+                                        dbNode, cacheManager, file, 
lockManager);
+                                try {
+                                  flushExecutor.flushDatabase();
+                                } catch (IOException e) {
+                                  LOGGER.warn(
+                                      "Error occurred during MTree flush, 
current SchemaRegionId is {} because {}",
+                                      regionId,
+                                      e.getMessage(),
+                                      e);
+                                }
+                              }
+                              flushExecutor =
+                                  new PBTreeFlushExecutor(
+                                      cacheManager.collectVolatileSubtrees(),
+                                      cacheManager,
+                                      file,
+                                      lockManager);
+                              try {
+                                flushExecutor.flushVolatileNodes();
+                              } catch (MetadataException e) {
+                                LOGGER.warn(
+                                    "Error occurred during MTree flush, 
current SchemaRegionId is {} because {}",
+                                    regionId,
+                                    e.getMessage(),
+                                    e);
+                              } finally {
+                                long time = System.currentTimeMillis() - 
startTime;
+                                if (time > 10_000) {
+                                  LOGGER.info(
+                                      "It takes {}ms to flush MTree in 
SchemaRegion {}",
+                                      time,
+                                      regionId);
+                                } else {
+                                  LOGGER.debug(
+                                      "It takes {}ms to flush MTree in 
SchemaRegion {}",
+                                      time,
+                                      regionId);
+                                }
+                                flushingRegionSet.remove(regionId);
+                              }
+                            },
+                            workerPool))
+                .toArray(CompletableFuture[]::new))
+        .join();
+  }
+
+  /**
+   * Keep fetching evictable nodes from cacheManager until the memory status 
is under safe mode or
+   * no node could be evicted. Update the memory status after evicting each 
node.
+   *
+   * @param force true if force to evict all cache
+   */
+  public synchronized void scheduleRelease(boolean force) {
+    CompletableFuture.allOf(
+            regionToStore.values().stream()
+                .map(
+                    store ->
+                        CompletableFuture.runAsync(
+                            () -> {
+                              while (force || 
releaseFlushStrategy.isExceedReleaseThreshold()) {
+                                // store try to release memory if not exceed 
release threshold
+                                if (store.executeMemoryRelease()) {
+                                  // if store can not release memory, break
+                                  break;
+                                }
+                              }
+                            },
+                            workerPool))
+                .toArray(CompletableFuture[]::new))
+        .join();
+  }
+
+  /**
+   * Select some subtrees to flush. The subtrees are selected from the 
MTreeStore by the sequence of
+   * the regionIds. The number of subtrees to flush is determined by parameter 
{@link
+   * Scheduler#BATCH_FLUSH_SUBTREE}. It will return asynchronously. If worker 
pool is full, the task
+   * will be discarded directly.
+   *
+   * @param regionIds determine the MTreeStore to select subtrees, the head of 
the list is the first
+   *     MTreeStore to select subtrees
+   */
+  public synchronized void scheduleFlush(List<Integer> regionIds) {
+    AtomicInteger remainToFlush = new AtomicInteger(BATCH_FLUSH_SUBTREE);
+    for (int regionId : regionIds) {
+      if (flushingRegionSet.contains(regionId)) {
+        continue;
+      }
+      flushingRegionSet.add(regionId);
+      workerPool.submit(
+          () -> {
+            CachedMTreeStore store = regionToStore.get(regionId);
+            ICacheManager cacheManager = store.getCacheManager();
+            ISchemaFile file = store.getSchemaFile();
+            LockManager lockManager = store.getLockManager();
+            List<ICachedMNode> nodesToFlush = new ArrayList<>();
+            PBTreeFlushExecutor flushExecutor;
+            long startTime = System.currentTimeMillis();
+            try {
+              IDatabaseMNode<ICachedMNode> dbNode = 
cacheManager.collectUpdatedStorageGroupMNodes();
+              if (dbNode != null) {
+                flushExecutor = new PBTreeFlushExecutor(dbNode, cacheManager, 
file, lockManager);
+                flushExecutor.flushDatabase();
+                remainToFlush.decrementAndGet();
+              }
+              Iterator<ICachedMNode> volatileSubtrees = 
cacheManager.collectVolatileSubtrees();
+              while (volatileSubtrees.hasNext()) {
+                nodesToFlush.add(volatileSubtrees.next());
+                if (nodesToFlush.size() > remainToFlush.get()) {
+                  break;
+                }
+              }
+              flushExecutor =
+                  new PBTreeFlushExecutor(nodesToFlush.iterator(), 
cacheManager, file, lockManager);
+              flushExecutor.flushVolatileNodes();
+            } catch (MetadataException | IOException e) {
+              LOGGER.warn(
+                  "Error occurred during MTree flush, current SchemaRegionId 
is {} because {}",
+                  regionId,
+                  e.getMessage(),
+                  e);
+            } finally {
+              long time = System.currentTimeMillis() - startTime;
+              if (time > 10_000) {
+                LOGGER.info("It takes {}ms to flush MTree in SchemaRegion {}", 
time, regionId);
+              } else {
+                LOGGER.debug("It takes {}ms to flush MTree in SchemaRegion 
{}", time, regionId);
+              }
+              remainToFlush.addAndGet(-nodesToFlush.size());
+              flushingRegionSet.remove(regionId);
+            }
+          });
+      if (remainToFlush.get() <= 0) {
+        break;
+      }
+    }
+  }
+
+  public int getActiveWorkerNum() {
+    return ((WrappedThreadPoolExecutor) workerPool).getActiveCount();
+  }
+
+  public void clear() {
+    workerPool.shutdown();
+  }
+
+  public boolean isTerminated() {
+    return workerPool.isTerminated();
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/lock/StampedWriterPreferredLock.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/lock/StampedWriterPreferredLock.java
index 83961523af9..fcca70de3e3 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/lock/StampedWriterPreferredLock.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/lock/StampedWriterPreferredLock.java
@@ -91,7 +91,7 @@ public class StampedWriterPreferredLock {
    * re-entry within the same thread. Return directly if no thread holds a 
write lock ; block and
    * wait if another thread holds a write lock.
    *
-   * @param prior If false, it will also block and * wait if the write lock 
waiting queue is not
+   * @param prior If false, it will also block and wait if the write lock 
waiting queue is not
    *     empty.
    */
   public void threadReadLock(boolean prior) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/memcontrol/IReleaseFlushStrategy.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/memcontrol/IReleaseFlushStrategy.java
index b9a584074aa..7d40fd3687b 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/memcontrol/IReleaseFlushStrategy.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/memcontrol/IReleaseFlushStrategy.java
@@ -22,7 +22,4 @@ package 
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.memcontr
 public interface IReleaseFlushStrategy {
   /** Check if exceed release threshold */
   boolean isExceedReleaseThreshold();
-
-  /** Check if exceed flush threshold */
-  boolean isExceedFlushThreshold();
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/memcontrol/ReleaseFlushStrategyNumBasedImpl.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/memcontrol/ReleaseFlushStrategyNumBasedImpl.java
index fae84ec10a7..a3d5270a2fe 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/memcontrol/ReleaseFlushStrategyNumBasedImpl.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/memcontrol/ReleaseFlushStrategyNumBasedImpl.java
@@ -38,9 +38,4 @@ public class ReleaseFlushStrategyNumBasedImpl implements 
IReleaseFlushStrategy {
     return engineStatistics.getPinnedMNodeNum() + 
engineStatistics.getUnpinnedMNodeNum()
         > capacity * 0.6;
   }
-
-  @Override
-  public boolean isExceedFlushThreshold() {
-    return engineStatistics.getPinnedMNodeNum() + 
engineStatistics.getUnpinnedMNodeNum() > capacity;
-  }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/memcontrol/ReleaseFlushStrategySizeBasedImpl.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/memcontrol/ReleaseFlushStrategySizeBasedImpl.java
index 266959c1b17..c0be7203f2a 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/memcontrol/ReleaseFlushStrategySizeBasedImpl.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/memcontrol/ReleaseFlushStrategySizeBasedImpl.java
@@ -27,25 +27,16 @@ public class ReleaseFlushStrategySizeBasedImpl implements 
IReleaseFlushStrategy
   private final CachedSchemaEngineStatistics engineStatistics;
 
   private final long releaseThreshold;
-  private final long flushThreshold;
-
-  public static final double RELEASE_THRESHOLD_RATIO = 0.6;
-  public static final double FLUSH_THRESHOLD_RATION = 0.75;
+  public static final double RELEASE_THRESHOLD_RATIO = 0.70;
 
   public ReleaseFlushStrategySizeBasedImpl(CachedSchemaEngineStatistics 
engineStatistics) {
     this.engineStatistics = engineStatistics;
     long capacity = 
IoTDBDescriptor.getInstance().getConfig().getAllocateMemoryForSchemaRegion();
     this.releaseThreshold = (long) (capacity * RELEASE_THRESHOLD_RATIO);
-    this.flushThreshold = (long) (capacity * FLUSH_THRESHOLD_RATION);
   }
 
   @Override
   public boolean isExceedReleaseThreshold() {
     return engineStatistics.getMemoryUsage() > releaseThreshold;
   }
-
-  @Override
-  public boolean isExceedFlushThreshold() {
-    return engineStatistics.getMemoryUsage() > flushThreshold;
-  }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/pagemgr/PageManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/pagemgr/PageManager.java
index 015bd38fb06..27a634d6443 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/pagemgr/PageManager.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/pagemgr/PageManager.java
@@ -324,19 +324,14 @@ public abstract class PageManager implements IPageManager 
{
       if (!child.isMeasurement()) {
         alias = null;
 
-        if (getNodeAddress(child) >= 0) {
-          // new child with a valid segment address, weird
-          throw new MetadataException(
-              String.format(
-                  "A child [%s] in newChildBuffer shall not have 
segmentAddress.",
-                  child.getFullPath()));
-        }
-
-        // pre-allocate except that child is a device node using template
-        if (!(child.isDevice() && child.getAsDeviceMNode().isUseTemplate())) {
+        // TODO optimization if many device only using template but has no 
child
+        if (getNodeAddress(child) < 0) {
           short estSegSize = estimateSegmentSize(child);
           long glbIndex = preAllocateSegment(estSegSize, cxt);
           SchemaFile.setNodeAddress(child, glbIndex);
+        } else {
+          // new child with a valid segment address could be maliciously 
modified
+          throw new MetadataException("A child in newChildBuffer shall not 
have segmentAddress.");
         }
       } else {
         alias =
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/traverser/Traverser.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/traverser/Traverser.java
index b116bc1950b..86b1524bf75 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/traverser/Traverser.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/traverser/Traverser.java
@@ -32,6 +32,7 @@ import 
org.apache.iotdb.commons.schema.tree.AbstractTreeVisitor;
 import org.apache.iotdb.db.schemaengine.schemaregion.mtree.IMTreeStore;
 import 
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.mem.mnode.iterator.MNodeIterator;
 import 
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.ReentrantReadOnlyCachedMTreeStore;
+import 
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.cache.ReleaseFlushMonitor;
 import org.apache.iotdb.db.schemaengine.schemaregion.utils.MNodeUtils;
 import org.apache.iotdb.db.schemaengine.template.Template;
 
@@ -75,6 +76,8 @@ public abstract class Traverser<R, N extends IMNode<N>> 
extends AbstractTreeVisi
   protected boolean isPrefixMatch = false;
   private IDeviceMNode<N> skipTemplateDevice;
 
+  private ReleaseFlushMonitor.RecordNode timeRecorder;
+
   protected Traverser() {}
 
   /**
@@ -104,6 +107,7 @@ public abstract class Traverser<R, N extends IMNode<N>> 
extends AbstractTreeVisi
     }
     this.startNode = startNode;
     this.nodes = nodes;
+    this.timeRecorder = store.recordTraverserStatistics();
   }
 
   /**
@@ -120,6 +124,7 @@ public abstract class Traverser<R, N extends IMNode<N>> 
extends AbstractTreeVisi
     this.store = store.getWithReentrantReadLock();
     initStack();
     this.startNode = startNode;
+    this.timeRecorder = store.recordTraverserStatistics();
   }
 
   /**
@@ -250,9 +255,11 @@ public abstract class Traverser<R, N extends IMNode<N>> 
extends AbstractTreeVisi
   public void close() {
     super.close();
     if (store instanceof ReentrantReadOnlyCachedMTreeStore) {
-      // TODO update here
       ((ReentrantReadOnlyCachedMTreeStore) store).unlockRead();
     }
+    if (timeRecorder != null) {
+      timeRecorder.setEndTime(System.currentTimeMillis());
+    }
   }
 
   public void setTemplateMap(Map<Integer, Template> templateMap, 
IMNodeFactory<N> nodeFactory) {
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/metadata/mtree/schemafile/MonitorTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/metadata/mtree/schemafile/MonitorTest.java
new file mode 100644
index 00000000000..cade004d72d
--- /dev/null
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/metadata/mtree/schemafile/MonitorTest.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.metadata.mtree.schemafile;
+
+import 
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.cache.ReleaseFlushMonitor;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+public class MonitorTest {
+  private ReleaseFlushMonitor releaseFlushMonitor;
+
+  @Before
+  public void setUp() {
+    releaseFlushMonitor = ReleaseFlushMonitor.getInstance();
+  }
+
+  @After
+  public void tearDown() throws IOException {
+    releaseFlushMonitor.clear();
+  }
+
+  @Test
+  public void testGetRegionsToFlush() {
+    // free = 500
+    setRecord(
+        1, Arrays.asList(0L, 2L, 3L, 3000L, 4000L), Arrays.asList(100L, 2500L, 
200L, 5000L, 6000L));
+    // free = 2000
+    setRecord(
+        2, Arrays.asList(0L, 2L, 3L, 3000L, 4000L), Arrays.asList(100L, 1000L, 
200L, 5500L, 6000L));
+    // free =700
+    setRecord(3, Arrays.asList(700L, 800L), Arrays.asList(900L, 6000L));
+    // free =1700
+    setRecord(4, Arrays.asList(700L, 800L, 2500L), Arrays.asList(1000L, 1500L, 
5000L));
+    // free =2100
+    setRecord(5, Arrays.asList(0L, 2000L), Arrays.asList(1000L, 3900L));
+    List<Integer> regions = releaseFlushMonitor.getRegionsToFlush(5000);
+    Assert.assertEquals(3, regions.size());
+    Assert.assertEquals(5, regions.get(0).intValue());
+    Assert.assertEquals(2, regions.get(1).intValue());
+    Assert.assertEquals(4, regions.get(2).intValue());
+  }
+
+  @Test
+  public void testGetRegionsToFlush2() {
+    setRecord(1, Arrays.asList(0L, 2000L), Arrays.asList(100L, 7000L));
+    setRecord(2, Collections.singletonList(3000L), 
Collections.singletonList(3500L));
+    List<Integer> regions = releaseFlushMonitor.getRegionsToFlush(7000);
+    Assert.assertEquals(1, regions.size());
+    Assert.assertEquals(2, regions.get(0).intValue());
+  }
+
+  private void setRecord(int regionId, List<Long> startTimes, List<Long> 
eneTimes) {
+    releaseFlushMonitor.initRecordList(regionId);
+    for (int i = 0; i < startTimes.size(); i++) {
+      ReleaseFlushMonitor.RecordNode node = 
releaseFlushMonitor.recordTraverserTime(regionId);
+      node.setStartTime(startTimes.get(i));
+      node.setEndTime(eneTimes.get(i));
+    }
+  }
+}
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/metadata/schemaRegion/SchemaStatisticsTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/metadata/schemaRegion/SchemaStatisticsTest.java
index bef220b7c3a..7a07e03172a 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/metadata/schemaRegion/SchemaStatisticsTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/metadata/schemaRegion/SchemaStatisticsTest.java
@@ -28,7 +28,7 @@ import 
org.apache.iotdb.db.schemaengine.rescon.CachedSchemaEngineStatistics;
 import org.apache.iotdb.db.schemaengine.rescon.CachedSchemaRegionStatistics;
 import org.apache.iotdb.db.schemaengine.rescon.ISchemaEngineStatistics;
 import org.apache.iotdb.db.schemaengine.schemaregion.ISchemaRegion;
-import 
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.cache.CacheMemoryManager;
+import 
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.cache.ReleaseFlushMonitor;
 import 
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.mnode.ICachedMNode;
 import 
org.apache.iotdb.db.schemaengine.schemaregion.mtree.loader.MNodeFactoryLoader;
 import 
org.apache.iotdb.db.schemaengine.schemaregion.write.req.SchemaRegionWritePlanFactory;
@@ -76,7 +76,7 @@ public class SchemaStatisticsTest extends 
AbstractSchemaRegionTest {
       IMNodeFactory<ICachedMNode> nodeFactory =
           MNodeFactoryLoader.getInstance().getCachedMNodeIMNodeFactory();
       // wait release and flush task
-      Thread.sleep(1000);
+      Thread.sleep(6000);
       // schemaRegion1
       IMNode<ICachedMNode> sg1 =
           nodeFactory.createDatabaseMNode(
@@ -91,8 +91,7 @@ public class SchemaStatisticsTest extends 
AbstractSchemaRegionTest {
         Assert.assertEquals(
             size1 + nodeFactory.createDeviceMNode(sg1.getAsMNode(), 
"n").estimateSize(),
             schemaRegion1.getSchemaRegionStatistics().getRegionMemoryUsage());
-        CacheMemoryManager.getInstance().forceFlushAndRelease();
-        Thread.sleep(1000);
+        ReleaseFlushMonitor.getInstance().forceFlushAndRelease();
         Assert.assertEquals(
             size1, 
schemaRegion1.getSchemaRegionStatistics().getRegionMemoryUsage());
       }
@@ -267,7 +266,7 @@ public class SchemaStatisticsTest extends 
AbstractSchemaRegionTest {
         Assert.assertTrue(
             d0ExistSize == 
schemaRegion1.getSchemaRegionStatistics().getRegionMemoryUsage()
                 || d1ExistSize == 
schemaRegion1.getSchemaRegionStatistics().getRegionMemoryUsage());
-        CacheMemoryManager.getInstance().forceFlushAndRelease();
+        ReleaseFlushMonitor.getInstance().forceFlushAndRelease();
         // wait release and flush task
         Thread.sleep(1000);
         Assert.assertEquals(
@@ -453,7 +452,7 @@ public class SchemaStatisticsTest extends 
AbstractSchemaRegionTest {
         if (0 != cachedRegionStatistics1.getUnpinnedMNodeNum()) {
           // "d0" may remain in PartialMemory mode
           Assert.assertEquals("PBTree-PartialMemory", 
testParams.getTestModeName());
-          CacheMemoryManager.getInstance().forceFlushAndRelease();
+          ReleaseFlushMonitor.getInstance().forceFlushAndRelease();
           Thread.sleep(1000);
           Assert.assertEquals(0, 
cachedRegionStatistics1.getUnpinnedMNodeNum());
         }
diff --git a/iotdb-core/datanode/src/test/resources/logback-test.xml 
b/iotdb-core/datanode/src/test/resources/logback-test.xml
index 8692100d47d..dc90663f6cd 100644
--- a/iotdb-core/datanode/src/test/resources/logback-test.xml
+++ b/iotdb-core/datanode/src/test/resources/logback-test.xml
@@ -44,6 +44,7 @@
     <logger name="org.apache.iotdb.db.sync" level="INFO"/>
     <logger name="org.apache.iotdb.db.storageengine.merge" level="INFO"/>
     <logger name="org.apache.iotdb.db.metadata" level="INFO"/>
+    <logger name="org.apache.iotdb.db.schemaengine" level="INFO"/>
     <logger name="org.apache.iotdb.commons.service.ThriftServiceThread" 
level="INFO"/>
     <logger name="org.eclipse.jetty.util.thread.QueuedThreadPool" 
level="INFO"/>
     <logger name="org.apache.iotdb.commons.service.metric.MetricService" 
level="INFO"/>
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/IoTDBThreadPoolFactory.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/IoTDBThreadPoolFactory.java
index 54ad69bbb9c..343711f0bab 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/IoTDBThreadPoolFactory.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/IoTDBThreadPoolFactory.java
@@ -31,6 +31,7 @@ import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.RejectedExecutionHandler;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.SynchronousQueue;
@@ -79,6 +80,27 @@ public class IoTDBThreadPoolFactory {
         poolName);
   }
 
+  /**
+   * see {@link Executors#newFixedThreadPool(int, 
java.util.concurrent.ThreadFactory)}.
+   *
+   * @param poolName - the name of thread pool
+   * @return fixed size thread pool
+   */
+  public static ExecutorService newFixedThreadPool(
+      int nThreads, String poolName, RejectedExecutionHandler handler) {
+    logger.info(NEW_FIXED_THREAD_POOL_LOGGER_FORMAT, poolName, nThreads);
+
+    return new WrappedThreadPoolExecutor(
+        nThreads,
+        nThreads,
+        0L,
+        TimeUnit.MILLISECONDS,
+        new LinkedBlockingQueue<>(),
+        new IoTThreadFactory(poolName),
+        poolName,
+        handler);
+  }
+
   public static ExecutorService newFixedThreadPoolWithDaemonThread(int 
nThreads, String poolName) {
     logger.info(NEW_FIXED_THREAD_POOL_LOGGER_FORMAT, poolName, nThreads);
     return new WrappedSingleThreadExecutorService(
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java
index bd6c8097f1e..dbc26565b85 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java
@@ -65,10 +65,10 @@ public enum ThreadName {
   // -------------------------- SchemaEngine --------------------------
   SCHEMA_REGION_RELEASE_PROCESSOR("SchemaRegion-Release-Task-Processor"),
   SCHEMA_REGION_RECOVER_TASK("SchemaRegion-recover-task"),
-  SCHEMA_RELEASE_MONITOR("Schema-Release-Task-Monitor"),
-  SCHEMA_REGION_FLUSH_PROCESSOR("SchemaRegion-Flush-Task-Processor"),
-  SCHEMA_FLUSH_MONITOR("Schema-Flush-Task-Monitor"),
   SCHEMA_FORCE_MLOG("SchemaEngine-TimedForceMLog-Thread"),
+  PBTREE_RELEASE_MONITOR("PBTree-Release-Task-Monitor"),
+  PBTREE_FLUSH_MONITOR("PBTree-Flush-Monitor"),
+  PBTREE_FLUSH_PROCESSOR("PBTree-Flush-Processor"),
   // -------------------------- ClientService --------------------------
   CLIENT_RPC_SERVICE("ClientRPC-Service"),
   CLIENT_RPC_PROCESSOR("ClientRPC-Processor"),
@@ -220,10 +220,10 @@ public enum ThreadName {
           Arrays.asList(
               SCHEMA_REGION_RELEASE_PROCESSOR,
               SCHEMA_REGION_RECOVER_TASK,
-              SCHEMA_RELEASE_MONITOR,
-              SCHEMA_REGION_FLUSH_PROCESSOR,
-              SCHEMA_FLUSH_MONITOR,
-              SCHEMA_FORCE_MLOG));
+              PBTREE_RELEASE_MONITOR,
+              SCHEMA_FORCE_MLOG,
+              PBTREE_FLUSH_MONITOR,
+              PBTREE_FLUSH_PROCESSOR));
 
   private static final Set<ThreadName> clientServiceThreadNames =
       new HashSet<>(Arrays.asList(CLIENT_RPC_SERVICE, CLIENT_RPC_PROCESSOR));
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/threadpool/WrappedThreadPoolExecutor.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/threadpool/WrappedThreadPoolExecutor.java
index d81c97f8454..49f914f5c47 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/threadpool/WrappedThreadPoolExecutor.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/threadpool/WrappedThreadPoolExecutor.java
@@ -32,6 +32,7 @@ import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.CancellationException;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
+import java.util.concurrent.RejectedExecutionHandler;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
@@ -73,6 +74,23 @@ public class WrappedThreadPoolExecutor extends 
ThreadPoolExecutor
     ThreadPoolMetrics.getInstance().registerThreadPool(this, this.mbeanName);
   }
 
+  public WrappedThreadPoolExecutor(
+      int corePoolSize,
+      int maximumPoolSize,
+      long keepAliveTime,
+      TimeUnit unit,
+      BlockingQueue<Runnable> workQueue,
+      IoTThreadFactory ioTThreadFactory,
+      String mbeanName,
+      RejectedExecutionHandler handler) {
+    super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, 
ioTThreadFactory, handler);
+    this.mbeanName =
+        String.format(
+            "%s:%s=%s", IoTDBConstant.IOTDB_THREADPOOL_JMX_NAME, 
IoTDBConstant.JMX_TYPE, mbeanName);
+    JMXService.registerMBean(this, this.mbeanName);
+    ThreadPoolMetrics.getInstance().registerThreadPool(this, this.mbeanName);
+  }
+
   @Override
   public void shutdown() {
     super.shutdown();


Reply via email to