This is an automated email from the ASF dual-hosted git repository.
chenyz pushed a commit to branch pbtree_concurrent
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/pbtree_concurrent by this push:
new 10c8c6d2727 PBTree Flush Control Monitor (#11699)
10c8c6d2727 is described below
commit 10c8c6d272746f213b1ee8a4c94d8db63090fd41
Author: Chen YZ <[email protected]>
AuthorDate: Sun Dec 17 12:23:39 2023 +0800
PBTree Flush Control Monitor (#11699)
* save
* save
* save
* fix some bugs
* save
* save
* check
* fix some comment
* pre-allocate despite the device is using template
* test ci
---------
Co-authored-by: BigGreyBear <[email protected]>
---
.../persistence/schema/ConfigMTreeStore.java | 7 +
.../metric/SchemaEngineCachedMetric.java | 65 +---
.../schemaengine/metric/SchemaMetricManager.java | 4 +-
.../schemaengine/rescon/SchemaResourceManager.java | 6 +-
.../schemaregion/impl/SchemaRegionPBTreeImpl.java | 10 +-
.../schemaregion/mtree/IMTreeStore.java | 3 +
.../schemaregion/mtree/impl/mem/MemMTreeStore.java | 9 +-
.../mtree/impl/pbtree/CachedMTreeStore.java | 96 +++---
.../pbtree/ReentrantReadOnlyCachedMTreeStore.java | 6 +
.../impl/pbtree/cache/CacheMemoryManager.java | 355 --------------------
.../impl/pbtree/cache/ReleaseFlushMonitor.java | 373 +++++++++++++++++++++
.../impl/pbtree/flush/PBTreeFlushExecutor.java | 71 +++-
.../mtree/impl/pbtree/flush/Scheduler.java | 261 ++++++++++++++
.../pbtree/lock/StampedWriterPreferredLock.java | 2 +-
.../pbtree/memcontrol/IReleaseFlushStrategy.java | 3 -
.../ReleaseFlushStrategyNumBasedImpl.java | 5 -
.../ReleaseFlushStrategySizeBasedImpl.java | 11 +-
.../pbtree/schemafile/pagemgr/PageManager.java | 15 +-
.../schemaregion/mtree/traverser/Traverser.java | 9 +-
.../db/metadata/mtree/schemafile/MonitorTest.java | 85 +++++
.../schemaRegion/SchemaStatisticsTest.java | 11 +-
.../datanode/src/test/resources/logback-test.xml | 1 +
.../commons/concurrent/IoTDBThreadPoolFactory.java | 22 ++
.../iotdb/commons/concurrent/ThreadName.java | 14 +-
.../threadpool/WrappedThreadPoolExecutor.java | 18 +
25 files changed, 926 insertions(+), 536 deletions(-)
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/schema/ConfigMTreeStore.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/schema/ConfigMTreeStore.java
index 5f8f45a660e..ce384e87959 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/schema/ConfigMTreeStore.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/schema/ConfigMTreeStore.java
@@ -31,6 +31,7 @@ import
org.apache.iotdb.db.schemaengine.schemaregion.mtree.IMTreeStore;
import
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.mem.mnode.iterator.AbstractTraverserIterator;
import
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.mem.mnode.iterator.MNodeIterator;
import
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.mem.mnode.iterator.MemoryTraverserIterator;
+import
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.cache.ReleaseFlushMonitor;
import org.apache.iotdb.db.schemaengine.template.Template;
import java.io.File;
@@ -143,4 +144,10 @@ public class ConfigMTreeStore implements
IMTreeStore<IConfigMNode> {
public boolean createSnapshot(File snapshotDir) {
throw new UnsupportedOperationException();
}
+
+ @Override
+ public ReleaseFlushMonitor.RecordNode recordTraverserStatistics() {
+ // do nothing
+ return null;
+ }
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/metric/SchemaEngineCachedMetric.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/metric/SchemaEngineCachedMetric.java
index 59c3e417b80..ee14822fd6e 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/metric/SchemaEngineCachedMetric.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/metric/SchemaEngineCachedMetric.java
@@ -19,32 +19,27 @@
package org.apache.iotdb.db.schemaengine.metric;
-import org.apache.iotdb.commons.service.metric.MetricService;
import org.apache.iotdb.commons.service.metric.enums.Metric;
import org.apache.iotdb.commons.service.metric.enums.Tag;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.schemaengine.rescon.CachedSchemaEngineStatistics;
-import
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.cache.CacheMemoryManager;
+import
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.cache.ReleaseFlushMonitor;
import
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.memcontrol.ReleaseFlushStrategySizeBasedImpl;
import org.apache.iotdb.metrics.AbstractMetricService;
import org.apache.iotdb.metrics.utils.MetricLevel;
import org.apache.iotdb.metrics.utils.MetricType;
-import java.util.concurrent.TimeUnit;
-
public class SchemaEngineCachedMetric implements ISchemaEngineMetric {
// TODO: rename schema_file to pbtree
private static final String RELEASE_THRESHOLD =
"schema_file_release_threshold";
- private static final String FLUSH_THRESHOLD = "schema_file_flush_threshold";
private static final String PINNED_NODE_NUM = "schema_file_pinned_num";
private static final String UNPINNED_NODE_NUM = "schema_file_unpinned_num";
private static final String PINNED_MEM_SIZE = "schema_file_pinned_mem";
private static final String UNPINNED_MEM_SIZE = "schema_file_unpinned_mem";
- private static final String RELEASE_TIMER = "schema_file_release";
- private static final String FLUSH_TIMER = "schema_file_flush";
- private static final String RELEASE_THREAD_NUM =
"schema_file_release_thread_num";
- private static final String FLUSH_THREAD_NUM =
"schema_file_flush_thread_num";
+ private static final String RELEASE_TIMER = "schema_file_release"; // TODO:
implement it
+ private static final String FLUSH_TIMER = "schema_file_flush"; // TODO:
implement it
+ private static final String RELEASE_FLUSH_THREAD_NUM =
"schema_file_release_flush_thread_num";
private final CachedSchemaEngineStatistics engineStatistics;
@@ -66,14 +61,6 @@ public class SchemaEngineCachedMetric implements
ISchemaEngineMetric {
MetricLevel.IMPORTANT,
Tag.NAME.toString(),
RELEASE_THRESHOLD);
- metricService.gauge(
- (long)
-
(IoTDBDescriptor.getInstance().getConfig().getAllocateMemoryForSchemaRegion()
- * ReleaseFlushStrategySizeBasedImpl.FLUSH_THRESHOLD_RATION),
- Metric.SCHEMA_ENGINE.toString(),
- MetricLevel.IMPORTANT,
- Tag.NAME.toString(),
- FLUSH_THRESHOLD);
metricService.createAutoGauge(
Metric.SCHEMA_ENGINE.toString(),
MetricLevel.IMPORTANT,
@@ -109,17 +96,10 @@ public class SchemaEngineCachedMetric implements
ISchemaEngineMetric {
metricService.createAutoGauge(
Metric.SCHEMA_ENGINE.toString(),
MetricLevel.IMPORTANT,
- CacheMemoryManager.getInstance(),
- CacheMemoryManager::getReleaseThreadNum,
- Tag.NAME.toString(),
- RELEASE_THREAD_NUM);
- metricService.createAutoGauge(
- Metric.SCHEMA_ENGINE.toString(),
- MetricLevel.IMPORTANT,
- CacheMemoryManager.getInstance(),
- CacheMemoryManager::getFlushThreadNum,
+ ReleaseFlushMonitor.getInstance(),
+ ReleaseFlushMonitor::getActiveWorkerNum,
Tag.NAME.toString(),
- FLUSH_THREAD_NUM);
+ RELEASE_FLUSH_THREAD_NUM);
}
@Override
@@ -127,8 +107,6 @@ public class SchemaEngineCachedMetric implements
ISchemaEngineMetric {
schemaEngineMemMetric.unbindFrom(metricService);
metricService.remove(
MetricType.GAUGE, Metric.SCHEMA_ENGINE.toString(),
Tag.NAME.toString(), RELEASE_THRESHOLD);
- metricService.remove(
- MetricType.GAUGE, Metric.SCHEMA_ENGINE.toString(),
Tag.NAME.toString(), FLUSH_THRESHOLD);
metricService.remove(
MetricType.AUTO_GAUGE,
Metric.SCHEMA_ENGINE.toString(),
@@ -157,33 +135,6 @@ public class SchemaEngineCachedMetric implements
ISchemaEngineMetric {
MetricType.AUTO_GAUGE,
Metric.SCHEMA_ENGINE.toString(),
Tag.NAME.toString(),
- RELEASE_THREAD_NUM);
- metricService.remove(
- MetricType.AUTO_GAUGE,
- Metric.SCHEMA_ENGINE.toString(),
- Tag.NAME.toString(),
- FLUSH_THREAD_NUM);
- }
-
- public void recordFlush(long milliseconds) {
- MetricService.getInstance()
- .timer(
- milliseconds,
- TimeUnit.MILLISECONDS,
- Metric.SCHEMA_ENGINE.toString(),
- MetricLevel.IMPORTANT,
- Tag.NAME.toString(),
- FLUSH_TIMER);
- }
-
- public void recordRelease(long milliseconds) {
- MetricService.getInstance()
- .timer(
- milliseconds,
- TimeUnit.MILLISECONDS,
- Metric.SCHEMA_ENGINE.toString(),
- MetricLevel.IMPORTANT,
- Tag.NAME.toString(),
- RELEASE_TIMER);
+ RELEASE_FLUSH_THREAD_NUM);
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/metric/SchemaMetricManager.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/metric/SchemaMetricManager.java
index 32ede440dbd..73e07ad00cb 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/metric/SchemaMetricManager.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/metric/SchemaMetricManager.java
@@ -23,7 +23,7 @@ import org.apache.iotdb.commons.conf.CommonDescriptor;
import org.apache.iotdb.commons.service.metric.MetricService;
import org.apache.iotdb.db.schemaengine.rescon.ISchemaEngineStatistics;
import org.apache.iotdb.db.schemaengine.schemaregion.ISchemaRegion;
-import
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.cache.CacheMemoryManager;
+import
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.cache.ReleaseFlushMonitor;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@@ -40,7 +40,7 @@ public class SchemaMetricManager {
SchemaEngineCachedMetric schemaEngineCachedMetric =
new
SchemaEngineCachedMetric(engineStatistics.getAsCachedSchemaEngineStatistics());
engineMetric = schemaEngineCachedMetric;
-
CacheMemoryManager.getInstance().setEngineMetric(schemaEngineCachedMetric);
+
ReleaseFlushMonitor.getInstance().setEngineMetric(schemaEngineCachedMetric);
}
MetricService.getInstance().addMetricSet(engineMetric);
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/rescon/SchemaResourceManager.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/rescon/SchemaResourceManager.java
index 56bf96deebf..385109ed471 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/rescon/SchemaResourceManager.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/rescon/SchemaResourceManager.java
@@ -21,7 +21,7 @@ package org.apache.iotdb.db.schemaengine.rescon;
import org.apache.iotdb.commons.conf.CommonDescriptor;
import org.apache.iotdb.db.schemaengine.SchemaEngineMode;
-import
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.cache.CacheMemoryManager;
+import
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.cache.ReleaseFlushMonitor;
public class SchemaResourceManager {
@@ -46,10 +46,10 @@ public class SchemaResourceManager {
}
private static void initSchemaFileModeResource(ISchemaEngineStatistics
engineStatistics) {
- CacheMemoryManager.getInstance().init(engineStatistics);
+ ReleaseFlushMonitor.getInstance().init(engineStatistics);
}
private static void clearSchemaFileModeResource() {
- CacheMemoryManager.getInstance().clear();
+ ReleaseFlushMonitor.getInstance().clear();
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/impl/SchemaRegionPBTreeImpl.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/impl/SchemaRegionPBTreeImpl.java
index f9a80332187..f8f4b6f4ce5 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/impl/SchemaRegionPBTreeImpl.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/impl/SchemaRegionPBTreeImpl.java
@@ -58,7 +58,7 @@ import
org.apache.iotdb.db.schemaengine.schemaregion.logfile.SchemaLogWriter;
import
org.apache.iotdb.db.schemaengine.schemaregion.logfile.visitor.SchemaRegionPlanDeserializer;
import
org.apache.iotdb.db.schemaengine.schemaregion.logfile.visitor.SchemaRegionPlanSerializer;
import
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.MTreeBelowSGCachedImpl;
-import
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.cache.CacheMemoryManager;
+import
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.cache.ReleaseFlushMonitor;
import
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.mnode.ICachedMNode;
import org.apache.iotdb.db.schemaengine.schemaregion.read.req.IShowDevicesPlan;
import org.apache.iotdb.db.schemaengine.schemaregion.read.req.IShowNodesPlan;
@@ -580,7 +580,7 @@ public class SchemaRegionPBTreeImpl implements
ISchemaRegion {
@SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity
warning
public void createTimeseries(ICreateTimeSeriesPlan plan, long offset) throws
MetadataException {
while (!regionStatistics.isAllowToCreateNewSeries()) {
- CacheMemoryManager.getInstance().waitIfReleasing();
+ ReleaseFlushMonitor.getInstance().waitIfReleasing();
}
PartialPath path = plan.getPath();
@@ -666,7 +666,7 @@ public class SchemaRegionPBTreeImpl implements
ISchemaRegion {
public void createAlignedTimeSeries(ICreateAlignedTimeSeriesPlan plan)
throws MetadataException {
int seriesCount = plan.getMeasurements().size();
while (!regionStatistics.isAllowToCreateNewSeries()) {
- CacheMemoryManager.getInstance().waitIfReleasing();
+ ReleaseFlushMonitor.getInstance().waitIfReleasing();
}
try {
@@ -841,7 +841,7 @@ public class SchemaRegionPBTreeImpl implements
ISchemaRegion {
@Override
public void createLogicalView(ICreateLogicalViewPlan plan) throws
MetadataException {
while (!regionStatistics.isAllowToCreateNewSeries()) {
- CacheMemoryManager.getInstance().waitIfReleasing();
+ ReleaseFlushMonitor.getInstance().waitIfReleasing();
}
try {
List<PartialPath> pathList = plan.getViewPathList();
@@ -1232,7 +1232,7 @@ public class SchemaRegionPBTreeImpl implements
ISchemaRegion {
public void activateSchemaTemplate(IActivateTemplateInClusterPlan plan,
Template template)
throws MetadataException {
while (!regionStatistics.isAllowToCreateNewSeries()) {
- CacheMemoryManager.getInstance().waitIfReleasing();
+ ReleaseFlushMonitor.getInstance().waitIfReleasing();
}
try {
ICachedMNode deviceNode =
getDeviceNodeWithAutoCreate(plan.getActivatePath());
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/IMTreeStore.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/IMTreeStore.java
index 81f8867d80c..67af9efead9 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/IMTreeStore.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/IMTreeStore.java
@@ -24,6 +24,7 @@ import org.apache.iotdb.commons.schema.node.IMNode;
import org.apache.iotdb.commons.schema.node.role.IDeviceMNode;
import org.apache.iotdb.commons.schema.node.role.IMeasurementMNode;
import org.apache.iotdb.commons.schema.node.utils.IMNodeIterator;
+import
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.cache.ReleaseFlushMonitor;
import org.apache.iotdb.db.schemaengine.template.Template;
import java.io.File;
@@ -99,4 +100,6 @@ public interface IMTreeStore<N extends IMNode<N>> {
void clear();
boolean createSnapshot(File snapshotDir);
+
+ ReleaseFlushMonitor.RecordNode recordTraverserStatistics();
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/mem/MemMTreeStore.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/mem/MemMTreeStore.java
index 2eab7c65563..22ee4235125 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/mem/MemMTreeStore.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/mem/MemMTreeStore.java
@@ -34,6 +34,7 @@ import
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.mem.mnode.iterat
import
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.mem.mnode.iterator.MNodeIterator;
import
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.mem.mnode.iterator.MemoryTraverserIterator;
import
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.mem.snapshot.MemMTreeSnapshotUtil;
+import
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.cache.ReleaseFlushMonitor;
import
org.apache.iotdb.db.schemaengine.schemaregion.mtree.loader.MNodeFactoryLoader;
import org.apache.iotdb.db.schemaengine.schemaregion.utils.MNodeUtils;
import org.apache.iotdb.db.schemaengine.template.Template;
@@ -48,7 +49,7 @@ public class MemMTreeStore implements IMTreeStore<IMemMNode> {
private final MemSchemaRegionStatistics regionStatistics;
private final IMNodeFactory<IMemMNode> nodeFactory =
- MNodeFactoryLoader.getInstance().getMemMNodeIMNodeFactory();;
+ MNodeFactoryLoader.getInstance().getMemMNodeIMNodeFactory();
private IMemMNode root;
@@ -218,6 +219,12 @@ public class MemMTreeStore implements
IMTreeStore<IMemMNode> {
regionStatistics);
}
+ @Override
+ public ReleaseFlushMonitor.RecordNode recordTraverserStatistics() {
+ // do nothing
+ return null;
+ }
+
private void requestMemory(int size) {
if (regionStatistics != null) {
regionStatistics.requestMemory(size);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/CachedMTreeStore.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/CachedMTreeStore.java
index 76aafffd435..708173bca5a 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/CachedMTreeStore.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/CachedMTreeStore.java
@@ -31,8 +31,8 @@ import
org.apache.iotdb.db.schemaengine.rescon.CachedSchemaRegionStatistics;
import org.apache.iotdb.db.schemaengine.schemaregion.mtree.IMTreeStore;
import
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.mem.mnode.estimator.MNodeSizeEstimator;
import
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.mem.mnode.iterator.AbstractTraverserIterator;
-import
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.cache.CacheMemoryManager;
import
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.cache.ICacheManager;
+import
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.cache.ReleaseFlushMonitor;
import
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.flush.PBTreeFlushExecutor;
import
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.lock.LockManager;
import
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.memcontrol.MemManager;
@@ -66,10 +66,12 @@ public class CachedMTreeStore implements
IMTreeStore<ICachedMNode> {
private final ICacheManager cacheManager;
private ISchemaFile file;
private ICachedMNode root;
+ // TODO: delete it
private final Runnable flushCallback;
private final IMNodeFactory<ICachedMNode> nodeFactory =
MNodeFactoryLoader.getInstance().getCachedMNodeIMNodeFactory();
private final CachedSchemaRegionStatistics regionStatistics;
+ private final ReleaseFlushMonitor releaseFlushMonitor =
ReleaseFlushMonitor.getInstance();
private final LockManager lockManager = new LockManager();
public CachedMTreeStore(
@@ -84,8 +86,7 @@ public class CachedMTreeStore implements
IMTreeStore<ICachedMNode> {
this.regionStatistics = regionStatistics;
this.memManager = new MemManager(regionStatistics);
this.flushCallback = flushCallback;
- this.cacheManager =
- CacheMemoryManager.getInstance().createLRUCacheManager(this,
memManager, lockManager);
+ this.cacheManager = releaseFlushMonitor.createLRUCacheManager(this,
memManager, lockManager);
cacheManager.initRootStatus(root);
regionStatistics.setCacheManager(cacheManager);
ensureMemoryStatus();
@@ -108,6 +109,18 @@ public class CachedMTreeStore implements
IMTreeStore<ICachedMNode> {
return res;
}
+ public ICacheManager getCacheManager() {
+ return cacheManager;
+ }
+
+ public ISchemaFile getSchemaFile() {
+ return file;
+ }
+
+ public LockManager getLockManager() {
+ return lockManager;
+ }
+
@Override
public ICachedMNode getRoot() {
return root;
@@ -519,7 +532,7 @@ public class CachedMTreeStore implements
IMTreeStore<ICachedMNode> {
public void clear() {
lockManager.globalWriteLock();
try {
- CacheMemoryManager.getInstance().clearCachedMTreeStore(this);
+ releaseFlushMonitor.clearCachedMTreeStore(this);
regionStatistics.setCacheManager(null);
cacheManager.clear(root);
root = null;
@@ -559,6 +572,11 @@ public class CachedMTreeStore implements
IMTreeStore<ICachedMNode> {
snapshotDir, storageGroup, schemaRegionId, regionStatistics,
flushCallback);
}
+ @Override
+ public ReleaseFlushMonitor.RecordNode recordTraverserStatistics() {
+ return releaseFlushMonitor.recordTraverserTime(schemaRegionId);
+ }
+
private CachedMTreeStore(
File snapshotDir,
String storageGroup,
@@ -572,15 +590,14 @@ public class CachedMTreeStore implements
IMTreeStore<ICachedMNode> {
this.regionStatistics = regionStatistics;
this.memManager = new MemManager(regionStatistics);
this.flushCallback = flushCallback;
- this.cacheManager =
- CacheMemoryManager.getInstance().createLRUCacheManager(this,
memManager, lockManager);
+ this.cacheManager = releaseFlushMonitor.createLRUCacheManager(this,
memManager, lockManager);
cacheManager.initRootStatus(root);
regionStatistics.setCacheManager(cacheManager);
ensureMemoryStatus();
}
private void ensureMemoryStatus() {
- CacheMemoryManager.getInstance().ensureMemoryStatus();
+ releaseFlushMonitor.ensureMemoryStatus();
}
public CachedSchemaRegionStatistics getRegionStatistics() {
@@ -606,43 +623,29 @@ public class CachedMTreeStore implements
IMTreeStore<ICachedMNode> {
lockManager.globalReadLock();
}
try {
- boolean hasVolatileNodes = flushVolatileDBNode();
+ PBTreeFlushExecutor flushExecutor;
+ IDatabaseMNode<ICachedMNode> updatedStorageGroupMNode =
+ cacheManager.collectUpdatedStorageGroupMNodes();
+ if (updatedStorageGroupMNode != null) {
+ flushExecutor =
+ new PBTreeFlushExecutor(updatedStorageGroupMNode, cacheManager,
file, lockManager);
+ flushExecutor.flushDatabase();
+ }
Iterator<ICachedMNode> volatileSubtrees =
cacheManager.collectVolatileSubtrees();
- if (volatileSubtrees.hasNext()) {
- hasVolatileNodes = true;
-
- long startTime = System.currentTimeMillis();
-
- ICachedMNode subtreeRoot;
- PBTreeFlushExecutor flushExecutor;
- while (volatileSubtrees.hasNext()) {
- subtreeRoot = volatileSubtrees.next();
- flushExecutor =
- new PBTreeFlushExecutor(subtreeRoot, needLock, cacheManager,
file, lockManager);
- flushExecutor.flushVolatileNodes();
- }
-
- long time = System.currentTimeMillis() - startTime;
- if (time > 10_000) {
- LOGGER.info("It takes {}ms to flush MTree in SchemaRegion {}", time,
schemaRegionId);
- } else {
- LOGGER.debug("It takes {}ms to flush MTree in SchemaRegion {}",
time, schemaRegionId);
- }
- }
- if (hasVolatileNodes) {
- flushCallback.run();
+ long startTime = System.currentTimeMillis();
+ flushExecutor = new PBTreeFlushExecutor(volatileSubtrees, cacheManager,
file, lockManager);
+ flushExecutor.flushVolatileNodes();
+ long time = System.currentTimeMillis() - startTime;
+ if (time > 10_000) {
+ LOGGER.info("It takes {}ms to flush MTree in SchemaRegion {}", time,
schemaRegionId);
+ } else {
+ LOGGER.debug("It takes {}ms to flush MTree in SchemaRegion {}", time,
schemaRegionId);
}
-
- ensureMemoryStatus();
- } catch (MetadataException | IOException e) {
- LOGGER.warn(
- "Exception occurred during MTree flush, current SchemaRegionId is
{}", schemaRegionId, e);
} catch (Throwable e) {
LOGGER.error(
"Error occurred during MTree flush, current SchemaRegionId is {}",
schemaRegionId, e);
- e.printStackTrace();
} finally {
if (needLock) {
lockManager.globalReadUnlock();
@@ -650,25 +653,6 @@ public class CachedMTreeStore implements
IMTreeStore<ICachedMNode> {
}
}
- private boolean flushVolatileDBNode() throws IOException {
- IDatabaseMNode<ICachedMNode> updatedStorageGroupMNode =
- cacheManager.collectUpdatedStorageGroupMNodes();
- if (updatedStorageGroupMNode == null) {
- return false;
- }
-
- try {
- file.updateDatabaseNode(updatedStorageGroupMNode);
- return true;
- } catch (IOException e) {
- LOGGER.warn(
- "IOException occurred during updating StorageGroupMNode {}",
- updatedStorageGroupMNode.getFullPath(),
- e);
- throw e;
- }
- }
-
/**
* Since any node R/W operation may change the memory status, thus it should
be controlled during
* iterating child nodes.
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/ReentrantReadOnlyCachedMTreeStore.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/ReentrantReadOnlyCachedMTreeStore.java
index 969bcdcfce4..964731bd455 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/ReentrantReadOnlyCachedMTreeStore.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/ReentrantReadOnlyCachedMTreeStore.java
@@ -24,6 +24,7 @@ import org.apache.iotdb.commons.schema.node.role.IDeviceMNode;
import org.apache.iotdb.commons.schema.node.role.IMeasurementMNode;
import org.apache.iotdb.commons.schema.node.utils.IMNodeIterator;
import org.apache.iotdb.db.schemaengine.schemaregion.mtree.IMTreeStore;
+import
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.cache.ReleaseFlushMonitor;
import
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.mnode.ICachedMNode;
import org.apache.iotdb.db.schemaengine.template.Template;
@@ -132,6 +133,11 @@ public class ReentrantReadOnlyCachedMTreeStore implements
IMTreeStore<ICachedMNo
throw new UnsupportedOperationException("ReadOnlyReentrantMTreeStore");
}
+ @Override
+ public ReleaseFlushMonitor.RecordNode recordTraverserStatistics() {
+ return store.recordTraverserStatistics();
+ }
+
public void unlockRead() {
store.stampedReadUnlock(readLockStamp);
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/cache/CacheMemoryManager.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/cache/CacheMemoryManager.java
deleted file mode 100644
index d5c6bec1e28..00000000000
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/cache/CacheMemoryManager.java
+++ /dev/null
@@ -1,355 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.cache;
-
-import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
-import org.apache.iotdb.commons.concurrent.ThreadName;
-import
org.apache.iotdb.commons.concurrent.threadpool.WrappedThreadPoolExecutor;
-import org.apache.iotdb.commons.utils.TestOnly;
-import org.apache.iotdb.db.conf.IoTDBDescriptor;
-import org.apache.iotdb.db.schemaengine.metric.SchemaEngineCachedMetric;
-import org.apache.iotdb.db.schemaengine.rescon.CachedSchemaEngineStatistics;
-import org.apache.iotdb.db.schemaengine.rescon.ISchemaEngineStatistics;
-import
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.CachedMTreeStore;
-import
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.lock.LockManager;
-import
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.memcontrol.IReleaseFlushStrategy;
-import
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.memcontrol.MemManager;
-import
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.memcontrol.ReleaseFlushStrategyNumBasedImpl;
-import
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.memcontrol.ReleaseFlushStrategySizeBasedImpl;
-import org.apache.iotdb.db.utils.concurrent.FiniteSemaphore;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.List;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.ExecutorService;
-
-/**
- * CacheMemoryManager is used to register the CachedMTreeStore and create the
CacheManager.
- * CacheMemoryManager provides the {@link
CacheMemoryManager#ensureMemoryStatus} interface, which
- * starts asynchronous threads to free and flush the disk when memory usage
exceeds a threshold.
- */
-public class CacheMemoryManager {
-
- private static final Logger logger =
LoggerFactory.getLogger(CacheMemoryManager.class);
-
- private final List<CachedMTreeStore> storeList = new
CopyOnWriteArrayList<>();
-
- private CachedSchemaEngineStatistics engineStatistics;
- private SchemaEngineCachedMetric engineMetric;
-
- private static final int CONCURRENT_NUM = 10;
-
- private ExecutorService flushTaskProcessor;
- private ExecutorService flushTaskMonitor;
- private ExecutorService releaseTaskProcessor;
- private ExecutorService releaseTaskMonitor;
-
- private FiniteSemaphore flushSemaphore;
- private FiniteSemaphore releaseSemaphore;
-
- private volatile boolean hasFlushTask;
-
- private volatile boolean hasReleaseTask;
-
- private IReleaseFlushStrategy releaseFlushStrategy;
-
- private static final int MAX_WAITING_TIME_WHEN_RELEASING = 3_000;
- private final Object blockObject = new Object();
-
- /**
- * Create and allocate LRUCacheManager to the corresponding CachedMTreeStore.
- *
- * @param store CachedMTreeStore
- * @return LRUCacheManager
- */
- public ICacheManager createLRUCacheManager(
- CachedMTreeStore store, MemManager memManager, LockManager lockManager) {
- ICacheManager cacheManager = new LRUCacheManager(memManager, lockManager);
- storeList.add(store);
- return cacheManager;
- }
-
- public void clearCachedMTreeStore(CachedMTreeStore store) {
- storeList.remove(store);
- }
-
- public void init(ISchemaEngineStatistics engineStatistics) {
- flushSemaphore = new FiniteSemaphore(2, 0);
- releaseSemaphore = new FiniteSemaphore(2, 0);
- this.engineStatistics =
engineStatistics.getAsCachedSchemaEngineStatistics();
- if
(IoTDBDescriptor.getInstance().getConfig().getCachedMNodeSizeInPBTreeMode() >=
0) {
- releaseFlushStrategy = new
ReleaseFlushStrategyNumBasedImpl(this.engineStatistics);
- } else {
- releaseFlushStrategy = new
ReleaseFlushStrategySizeBasedImpl(this.engineStatistics);
- }
- flushTaskMonitor =
-
IoTDBThreadPoolFactory.newSingleThreadExecutor(ThreadName.SCHEMA_FLUSH_MONITOR.getName());
- flushTaskProcessor =
- IoTDBThreadPoolFactory.newFixedThreadPool(
- CONCURRENT_NUM,
ThreadName.SCHEMA_REGION_FLUSH_PROCESSOR.getName());
- releaseTaskMonitor =
-
IoTDBThreadPoolFactory.newSingleThreadExecutor(ThreadName.SCHEMA_RELEASE_MONITOR.getName());
- releaseTaskProcessor =
- IoTDBThreadPoolFactory.newFixedThreadPool(
- CONCURRENT_NUM,
ThreadName.SCHEMA_REGION_RELEASE_PROCESSOR.getName());
- releaseTaskMonitor.submit(
- () -> {
- try {
- while (!Thread.currentThread().isInterrupted()) {
- releaseSemaphore.acquire();
- try {
- if (isExceedReleaseThreshold()) {
- hasReleaseTask = true;
- tryExecuteMemoryRelease();
- }
- } catch (Throwable throwable) {
- hasReleaseTask = false;
- logger.error("Something wrong happened during MTree release.",
throwable);
- }
- }
- } catch (InterruptedException e) {
- logger.info("ReleaseTaskMonitor thread is interrupted.");
- Thread.currentThread().interrupt();
- }
- });
- flushTaskMonitor.submit(
- () -> {
- try {
- while (!Thread.currentThread().isInterrupted()) {
- flushSemaphore.acquire();
- try {
- if (isExceedFlushThreshold()) {
- hasFlushTask = true;
- tryFlushVolatileNodes();
- }
- } catch (Throwable throwable) {
- hasFlushTask = false;
- logger.error("Something wrong happened during MTree flush.",
throwable);
- }
- }
- } catch (InterruptedException e) {
- logger.info("FlushTaskMonitor thread is interrupted.");
- Thread.currentThread().interrupt();
- }
- });
- }
-
- public void setEngineMetric(SchemaEngineCachedMetric engineMetric) {
- this.engineMetric = engineMetric;
- }
-
- public boolean isExceedReleaseThreshold() {
- return releaseFlushStrategy.isExceedReleaseThreshold();
- }
-
- public boolean isExceedFlushThreshold() {
- return releaseFlushStrategy.isExceedFlushThreshold();
- }
-
- /**
- * Check the current memory usage. If the release threshold is exceeded,
trigger the task to
- * perform an internal and external memory swap to release the memory.
- */
- public void ensureMemoryStatus() {
- if (isExceedReleaseThreshold()) {
- registerReleaseTask();
- }
- }
-
- /**
- * If there is a ReleaseTask or FlushTask, block the current thread to wait
up to
- * MAX_WAITING_TIME_WHEN_RELEASING. The thread will be woken up if the
ReleaseTask or FlushTask
- * ends or the wait time exceeds MAX_WAITING_TIME_WHEN_RELEASING.
- */
- public void waitIfReleasing() {
- synchronized (blockObject) {
- if (hasReleaseTask || hasFlushTask) {
- try {
- blockObject.wait(MAX_WAITING_TIME_WHEN_RELEASING);
- } catch (InterruptedException e) {
- logger.warn(
- "Interrupt because the release task and flush task did not
finish within {} milliseconds.",
- MAX_WAITING_TIME_WHEN_RELEASING);
- Thread.currentThread().interrupt();
- }
- }
- }
- }
-
- private void registerReleaseTask() {
- releaseSemaphore.release();
- }
-
- /**
- * Execute cache eviction until the memory status is under safe mode or no
node could be evicted.
- * If the memory status is still full, which means the nodes in memory are
all volatile nodes, new
- * added or updated, fire flush task.
- */
- private void tryExecuteMemoryRelease() {
- long startTime = System.currentTimeMillis();
- CompletableFuture.allOf(
- storeList.stream()
- .map(
- store ->
- CompletableFuture.runAsync(
- () -> {
- executeMemoryRelease(store);
- },
- releaseTaskProcessor))
- .toArray(CompletableFuture[]::new))
- .join();
- if (engineMetric != null) {
- engineMetric.recordRelease(System.currentTimeMillis() - startTime);
- }
- synchronized (blockObject) {
- hasReleaseTask = false;
- if (isExceedFlushThreshold()) {
- registerFlushTask();
- } else {
- blockObject.notifyAll();
- }
- }
- }
-
- /**
- * Keep fetching evictable nodes from cacheManager until the memory status
is under safe mode or
- * no node could be evicted. Update the memory status after evicting each
node.
- */
- private void executeMemoryRelease(CachedMTreeStore store) {
- while (isExceedReleaseThreshold()) {
- // store try to release memory if not exceed release threshold
- if (store.executeMemoryRelease()) {
- // if store can not release memory, break
- break;
- }
- }
- }
-
- private void registerFlushTask() {
- flushSemaphore.release();
- }
-
- /** Sync all volatile nodes to schemaFile and execute memory release after
flush. */
- private void tryFlushVolatileNodes() {
- long startTime = System.currentTimeMillis();
- CompletableFuture.allOf(
- storeList.stream()
- .map(
- store ->
- CompletableFuture.runAsync(
- () -> {
- store.flushVolatileNodes(true);
- },
- flushTaskProcessor))
- .toArray(CompletableFuture[]::new))
- .join();
- if (engineMetric != null) {
- engineMetric.recordFlush(System.currentTimeMillis() - startTime);
- }
- synchronized (blockObject) {
- hasFlushTask = false;
- blockObject.notifyAll();
- }
- }
-
- public void clear() {
- if (releaseTaskMonitor != null) {
- releaseTaskMonitor.shutdownNow();
- while (true) {
- if (releaseTaskMonitor.isTerminated()) break;
- }
- releaseTaskMonitor = null;
- }
- if (flushTaskMonitor != null) {
- flushTaskMonitor.shutdownNow();
- while (true) {
- if (flushTaskMonitor.isTerminated()) break;
- }
- releaseTaskMonitor = null;
- }
- if (releaseTaskProcessor != null) {
- while (true) {
- if (!hasReleaseTask) break;
- }
- releaseTaskProcessor.shutdown();
- while (true) {
- if (releaseTaskProcessor.isTerminated()) break;
- }
- releaseTaskProcessor = null;
- }
- // the release task may submit flush task, thus must be shut down and
clear first
- if (flushTaskProcessor != null) {
- while (true) {
- if (!hasFlushTask) break;
- }
- flushTaskProcessor.shutdown();
- while (true) {
- if (flushTaskProcessor.isTerminated()) break;
- }
- flushTaskProcessor = null;
- }
- storeList.clear();
- releaseFlushStrategy = null;
- engineStatistics = null;
- releaseSemaphore = null;
- flushSemaphore = null;
- engineMetric = null;
- }
-
- public int getReleaseThreadNum() {
- return ((WrappedThreadPoolExecutor) releaseTaskProcessor).getActiveCount();
- }
-
- public int getFlushThreadNum() {
- return ((WrappedThreadPoolExecutor) flushTaskProcessor).getActiveCount();
- }
-
- private CacheMemoryManager() {}
-
- private static class GlobalCacheManagerHolder {
- private static final CacheMemoryManager INSTANCE = new
CacheMemoryManager();
-
- private GlobalCacheManagerHolder() {}
- }
-
- public static CacheMemoryManager getInstance() {
- return CacheMemoryManager.GlobalCacheManagerHolder.INSTANCE;
- }
-
- @TestOnly
- public void forceFlushAndRelease() {
- releaseFlushStrategy =
- new IReleaseFlushStrategy() {
- @Override
- public boolean isExceedReleaseThreshold() {
- return true;
- }
-
- @Override
- public boolean isExceedFlushThreshold() {
- return true;
- }
- };
- registerFlushTask();
- registerReleaseTask();
- }
-}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/cache/ReleaseFlushMonitor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/cache/ReleaseFlushMonitor.java
new file mode 100644
index 00000000000..5837713e42a
--- /dev/null
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/cache/ReleaseFlushMonitor.java
@@ -0,0 +1,373 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.cache;
+
+import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
+import org.apache.iotdb.commons.concurrent.ThreadName;
+import org.apache.iotdb.commons.concurrent.threadpool.ScheduledExecutorUtil;
+import org.apache.iotdb.commons.utils.TestOnly;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.schemaengine.metric.SchemaEngineCachedMetric;
+import org.apache.iotdb.db.schemaengine.rescon.CachedSchemaEngineStatistics;
+import org.apache.iotdb.db.schemaengine.rescon.ISchemaEngineStatistics;
+import
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.CachedMTreeStore;
+import
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.flush.Scheduler;
+import
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.lock.LockManager;
+import
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.memcontrol.IReleaseFlushStrategy;
+import
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.memcontrol.MemManager;
+import
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.memcontrol.ReleaseFlushStrategyNumBasedImpl;
+import
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.memcontrol.ReleaseFlushStrategySizeBasedImpl;
+import org.apache.iotdb.db.utils.concurrent.FiniteSemaphore;
+import org.apache.iotdb.tsfile.utils.Pair;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.concurrent.NotThreadSafe;
+
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArraySet;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+/**
+ * CacheMemoryManager is used to register the CachedMTreeStore and create the
CacheManager.
+ * CacheMemoryManager provides the {@link
ReleaseFlushMonitor#ensureMemoryStatus} interface, which
+ * starts asynchronous threads to free and flush the disk when memory usage
exceeds a threshold.
+ */
+public class ReleaseFlushMonitor {
+ private static final Logger logger =
LoggerFactory.getLogger(ReleaseFlushMonitor.class);
+
+ /** configuration */
+ private static final double FREE_FLUSH_PROPORTION = 0.2;
+
+ private static final int MONITOR_INETRVAL_MILLISECONDS = 5000;
+ private static final int MAX_WAITING_TIME_WHEN_RELEASING = 3_000;
+
+ /** data structure */
+ private final Map<Integer, RecordList> regionToTraverserTime = new
ConcurrentHashMap<>();
+
+ private final Map<Integer, CachedMTreeStore> regionToStoreMap = new
ConcurrentHashMap<>();
+ private final Set<Integer> flushingRegionSet = new CopyOnWriteArraySet<>();
+ private CachedSchemaEngineStatistics engineStatistics;
+ private SchemaEngineCachedMetric engineMetric;
+ private IReleaseFlushStrategy releaseFlushStrategy;
+
+ /** thread and lock */
+ private final Object blockObject = new Object();
+
+ private ScheduledExecutorService flushMonitor;
+ private ExecutorService releaseMonitor;
+ private FiniteSemaphore releaseSemaphore;
+ private Scheduler scheduler;
+
+ /**
+ * Create and allocate LRUCacheManager to the corresponding CachedMTreeStore.
+ *
+ * @param store CachedMTreeStore
+ * @return LRUCacheManager
+ */
+ public ICacheManager createLRUCacheManager(
+ CachedMTreeStore store, MemManager memManager, LockManager lockManager) {
+ ICacheManager cacheManager = new LRUCacheManager(memManager, lockManager);
+ regionToStoreMap.put(store.getRegionStatistics().getSchemaRegionId(),
store);
+ regionToTraverserTime.put(store.getRegionStatistics().getSchemaRegionId(),
new RecordList());
+ return cacheManager;
+ }
+
+ public void clearCachedMTreeStore(CachedMTreeStore store) {
+ regionToStoreMap.remove(store.getRegionStatistics().getSchemaRegionId());
+ }
+
+ public void init(ISchemaEngineStatistics engineStatistics) {
+ releaseSemaphore = new FiniteSemaphore(2, 0);
+ this.engineStatistics =
engineStatistics.getAsCachedSchemaEngineStatistics();
+ if
(IoTDBDescriptor.getInstance().getConfig().getCachedMNodeSizeInPBTreeMode() >=
0) {
+ releaseFlushStrategy = new
ReleaseFlushStrategyNumBasedImpl(this.engineStatistics);
+ } else {
+ releaseFlushStrategy = new
ReleaseFlushStrategySizeBasedImpl(this.engineStatistics);
+ }
+ scheduler = new Scheduler(regionToStoreMap, flushingRegionSet,
releaseFlushStrategy);
+ releaseMonitor =
+
IoTDBThreadPoolFactory.newSingleThreadExecutor(ThreadName.PBTREE_RELEASE_MONITOR.getName());
+ flushMonitor =
+ IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor(
+ ThreadName.PBTREE_FLUSH_MONITOR.getName());
+ releaseMonitor.submit(
+ () -> {
+ try {
+ while (!Thread.currentThread().isInterrupted()) {
+ releaseSemaphore.acquire();
+ // 1. first, it will try to release node cache
+ if (releaseFlushStrategy.isExceedReleaseThreshold()) {
+ scheduler.scheduleRelease(false);
+ // 2. if it still exceeds release threshold, it will try to
flush node buffer, then
+ // release node cache again
+ if (releaseFlushStrategy.isExceedReleaseThreshold()) {
+ scheduler.forceFlushAll();
+ regionToTraverserTime.values().forEach(RecordList::clear);
+ scheduler.scheduleRelease(false);
+ }
+ synchronized (blockObject) {
+ // invoke the notifyAll() method to wake up the thread
waiting for the release
+ blockObject.notifyAll();
+ }
+ }
+ }
+ } catch (InterruptedException e) {
+ logger.info("ReleaseTaskMonitor thread is interrupted.");
+ Thread.currentThread().interrupt();
+ }
+ });
+ ScheduledExecutorUtil.safelyScheduleAtFixedRate(
+ flushMonitor,
+ () -> {
+ if (releaseFlushStrategy.isExceedReleaseThreshold()) {
+ releaseSemaphore.release();
+ } else {
+
scheduler.scheduleFlush(getRegionsToFlush(System.currentTimeMillis()));
+ }
+ },
+ MONITOR_INETRVAL_MILLISECONDS,
+ MONITOR_INETRVAL_MILLISECONDS,
+ TimeUnit.MILLISECONDS);
+ }
+
+ public void setEngineMetric(SchemaEngineCachedMetric engineMetric) {
+ this.engineMetric = engineMetric;
+ }
+
+ /**
+ * Check the current memory usage. If the release threshold is exceeded,
trigger the task to
+ * perform an internal and external memory swap to release the memory.
+ */
+ public void ensureMemoryStatus() {
+ if (releaseFlushStrategy.isExceedReleaseThreshold()) {
+ releaseSemaphore.release();
+ }
+ }
+
+ /**
+ * If there is a ReleaseTask or FlushTask, block the current thread to wait
up to
+ * MAX_WAITING_TIME_WHEN_RELEASING. The thread will be woken up if the
ReleaseTask or FlushTask
+ * ends or the wait time exceeds MAX_WAITING_TIME_WHEN_RELEASING.
+ */
+ public void waitIfReleasing() {
+ synchronized (blockObject) {
+ try {
+ blockObject.wait(MAX_WAITING_TIME_WHEN_RELEASING);
+ } catch (InterruptedException e) {
+ logger.warn(
+ "Interrupt because the release task and flush task did not finish
within {} milliseconds.",
+ MAX_WAITING_TIME_WHEN_RELEASING);
+ Thread.currentThread().interrupt();
+ }
+ }
+ }
+
+ public RecordNode recordTraverserTime(int regionId) {
+ return regionToTraverserTime.get(regionId).createAndAddToTail();
+ }
+
+ @TestOnly
+ public void initRecordList(int regionId) {
+ regionToTraverserTime.computeIfAbsent(regionId, k -> new RecordList());
+ }
+
+ public List<Integer> getRegionsToFlush(long windowsEndTime) {
+ long windowsStartTime = windowsEndTime - MONITOR_INETRVAL_MILLISECONDS;
+ List<Pair<Integer, Long>> regionAndFreeTimeList = new ArrayList<>();
+ for (Map.Entry<Integer, RecordList> entry :
regionToTraverserTime.entrySet()) {
+ int regionId = entry.getKey();
+ long traverserEndTime = windowsStartTime;
+ long traverserFreeTime = 0;
+ RecordList recordList = entry.getValue();
+ Iterator<RecordNode> iterator = recordList.iterator();
+ while (iterator.hasNext()) {
+ RecordNode recordNode = iterator.next();
+ if (recordNode.startTime > windowsEndTime) {
+ break;
+ }
+ if (recordNode.startTime > traverserEndTime) {
+ traverserFreeTime += (recordNode.startTime - traverserEndTime);
+ traverserEndTime = recordNode.endTime;
+ } else if (recordNode.endTime > traverserEndTime) {
+ traverserEndTime = recordNode.endTime;
+ }
+ if (recordNode.endTime < windowsStartTime) {
+ iterator.remove();
+ } else if (recordNode.endTime >= windowsEndTime) {
+ break;
+ }
+ }
+ if (traverserEndTime < windowsEndTime) {
+ traverserFreeTime += (windowsEndTime - traverserEndTime);
+ }
+ if (traverserFreeTime > FREE_FLUSH_PROPORTION *
MONITOR_INETRVAL_MILLISECONDS) {
+ regionAndFreeTimeList.add(new Pair<>(regionId, traverserFreeTime));
+ }
+ }
+ regionAndFreeTimeList.sort(Comparator.comparing((Pair<Integer, Long> o) ->
o.right).reversed());
+ return
regionAndFreeTimeList.stream().map(Pair::getLeft).collect(Collectors.toList());
+ }
+
+ @TestOnly
+ public void forceFlushAndRelease() {
+ scheduler.forceFlushAll();
+ scheduler.scheduleRelease(true);
+ }
+
+ public void clear() {
+ if (releaseMonitor != null) {
+ releaseMonitor.shutdownNow();
+ while (true) {
+ if (releaseMonitor.isTerminated()) break;
+ }
+ releaseMonitor = null;
+ }
+ if (flushMonitor != null) {
+ flushMonitor.shutdownNow();
+ while (true) {
+ if (flushMonitor.isTerminated()) break;
+ }
+ flushMonitor = null;
+ }
+ if (scheduler != null) {
+ scheduler.clear();
+ while (true) {
+ if (scheduler.isTerminated()) break;
+ }
+ scheduler = null;
+ }
+ regionToStoreMap.clear();
+ flushingRegionSet.clear();
+ regionToTraverserTime.clear();
+ releaseFlushStrategy = null;
+ engineStatistics = null;
+ releaseSemaphore = null;
+ engineMetric = null;
+ }
+
+ public int getActiveWorkerNum() {
+ return scheduler.getActiveWorkerNum();
+ }
+
+ private ReleaseFlushMonitor() {}
+
+ private static class ReleaseFlushMonitorHolder {
+ private static final ReleaseFlushMonitor INSTANCE = new
ReleaseFlushMonitor();
+
+ private ReleaseFlushMonitorHolder() {}
+ }
+
+ public static ReleaseFlushMonitor getInstance() {
+ return ReleaseFlushMonitor.ReleaseFlushMonitorHolder.INSTANCE;
+ }
+
+ @NotThreadSafe
+ private static class RecordList {
+ // The start time of RecordNode is incremental from head to tail
+ private final RecordNode head = new RecordNode();
+ private final RecordNode tail = new RecordNode();
+
+ private RecordList() {
+ head.next = tail;
+ tail.prev = head;
+ }
+
+ private synchronized RecordNode createAndAddToTail() {
+ RecordNode recordNode = new RecordNode();
+ recordNode.prev = tail.prev;
+ recordNode.next = tail;
+ tail.prev.next = recordNode;
+ tail.prev = recordNode;
+ return recordNode;
+ }
+
+ private synchronized void remove(RecordNode recordNode) {
+ recordNode.prev.next = recordNode.next;
+ recordNode.next.prev = recordNode.prev;
+ recordNode.prev = null;
+ recordNode.next = null;
+ }
+
+ private synchronized void clear() {
+ head.next = tail;
+ tail.prev = head;
+ }
+
+ private Iterator<RecordNode> iterator() {
+ return new Iterator<RecordNode>() {
+ private RecordNode next = null;
+ private RecordNode cur = head;
+
+ @Override
+ public boolean hasNext() {
+ if (next == null && cur.next != tail) {
+ next = cur.next;
+ }
+ return next != null;
+ }
+
+ @Override
+ public RecordNode next() {
+ if (!hasNext()) {
+ throw new NoSuchElementException();
+ }
+ cur = next;
+ next = null;
+ return cur;
+ }
+
+ @Override
+ public void remove() {
+ if (next == null && cur.next != tail) {
+ next = cur.next;
+ }
+ RecordList.this.remove(cur);
+ }
+ };
+ }
+ }
+
+ public static class RecordNode {
+ private RecordNode prev = null;
+ private RecordNode next = null;
+ private Long startTime = System.currentTimeMillis();
+ private Long endTime = Long.MAX_VALUE;
+
+ @TestOnly
+ public void setStartTime(Long startTime) {
+ this.startTime = startTime;
+ }
+
+ public void setEndTime(Long endTime) {
+ this.endTime = endTime;
+ }
+ }
+}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/flush/PBTreeFlushExecutor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/flush/PBTreeFlushExecutor.java
index 614c28087fa..b1b59b630c7 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/flush/PBTreeFlushExecutor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/flush/PBTreeFlushExecutor.java
@@ -20,6 +20,7 @@
package org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.flush;
import org.apache.iotdb.commons.exception.MetadataException;
+import org.apache.iotdb.commons.schema.node.role.IDatabaseMNode;
import
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.cache.ICacheManager;
import
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.lock.LockManager;
import
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.mnode.ICachedMNode;
@@ -31,6 +32,7 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.ArrayDeque;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.Deque;
import java.util.Iterator;
import java.util.List;
@@ -39,43 +41,85 @@ public class PBTreeFlushExecutor {
private static final Logger logger =
LoggerFactory.getLogger(PBTreeFlushExecutor.class);
- private final ICachedMNode subtreeRoot;
+ private final Iterator<ICachedMNode> subtreeRoots;
private final ICacheManager cacheManager;
-
private final ISchemaFile file;
-
private final LockManager lockManager;
public PBTreeFlushExecutor(
- ICachedMNode subtreeRoot,
- boolean needLock,
+ Iterator<ICachedMNode> subtreeRoots,
+ ICacheManager cacheManager,
+ ISchemaFile file,
+ LockManager lockManager) {
+ this.subtreeRoots = subtreeRoots;
+ this.cacheManager = cacheManager;
+ this.file = file;
+ this.lockManager = lockManager;
+ }
+
+ public PBTreeFlushExecutor(
+ IDatabaseMNode<ICachedMNode> databaseMNode,
ICacheManager cacheManager,
ISchemaFile file,
LockManager lockManager) {
- this.subtreeRoot = subtreeRoot;
+ this.subtreeRoots =
Collections.singletonList(databaseMNode.getAsMNode()).iterator();
this.cacheManager = cacheManager;
this.file = file;
this.lockManager = lockManager;
}
- public void flushVolatileNodes() throws MetadataException, IOException {
+ public void flushVolatileNodes() throws MetadataException {
+ List<Exception> exceptions = new ArrayList<>();
+ while (subtreeRoots.hasNext()) {
+ try {
+ processFlushNonDatabase(subtreeRoots.next());
+ } catch (Exception e) {
+ exceptions.add(e);
+ }
+ }
+ if (!exceptions.isEmpty()) {
+ throw new MetadataException(
+ exceptions.stream().map(Exception::getMessage).reduce("", (a, b) ->
a + ", " + b));
+ }
+ }
+
+ public void flushDatabase() throws IOException {
+ while (subtreeRoots.hasNext()) {
+ ICachedMNode subtreeRoot = subtreeRoots.next();
+ processFlushDatabase(subtreeRoot.getAsDatabaseMNode());
+ }
+ }
+
+ private void processFlushDatabase(IDatabaseMNode<ICachedMNode>
updatedStorageGroupMNode)
+ throws IOException {
+ try {
+ file.updateDatabaseNode(updatedStorageGroupMNode);
+ } catch (IOException e) {
+ logger.warn(
+ "IOException occurred during updating StorageGroupMNode {}",
+ updatedStorageGroupMNode.getFullPath(),
+ e);
+ throw e;
+ }
+ }
+
+ private void processFlushNonDatabase(ICachedMNode subtreeRoot)
+ throws MetadataException, IOException {
Iterator<ICachedMNode> volatileSubtreeIterator;
List<ICachedMNode> collectedVolatileSubtrees;
try {
- file.writeMNode(this.subtreeRoot);
+ file.writeMNode(subtreeRoot);
collectedVolatileSubtrees = new ArrayList<>();
volatileSubtreeIterator =
-
cacheManager.updateCacheStatusAndRetrieveSubtreeAfterPersist(this.subtreeRoot);
+
cacheManager.updateCacheStatusAndRetrieveSubtreeAfterPersist(subtreeRoot);
while (volatileSubtreeIterator.hasNext()) {
collectedVolatileSubtrees.add(volatileSubtreeIterator.next());
}
} catch (MetadataException | IOException e) {
logger.warn(
- "Error occurred during MTree flush, current node is {}",
- this.subtreeRoot.getFullPath(),
- e);
- cacheManager.updateCacheStatusAfterFlushFailure(this.subtreeRoot);
+ "Error occurred during MTree flush, current node is {}",
subtreeRoot.getFullPath(), e);
+ cacheManager.updateCacheStatusAfterFlushFailure(subtreeRoot);
throw e;
} finally {
lockManager.writeUnlock(subtreeRoot);
@@ -85,7 +129,6 @@ public class PBTreeFlushExecutor {
volatileSubtreeStack.push(collectedVolatileSubtrees.iterator());
Iterator<ICachedMNode> subtreeIterator;
- ICachedMNode subtreeRoot;
while (!volatileSubtreeStack.isEmpty()) {
subtreeIterator = volatileSubtreeStack.peek();
if (!subtreeIterator.hasNext()) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/flush/Scheduler.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/flush/Scheduler.java
new file mode 100644
index 00000000000..0ef7a95d3c3
--- /dev/null
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/flush/Scheduler.java
@@ -0,0 +1,261 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.flush;
+
+import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
+import org.apache.iotdb.commons.concurrent.ThreadName;
+import
org.apache.iotdb.commons.concurrent.threadpool.WrappedThreadPoolExecutor;
+import org.apache.iotdb.commons.exception.MetadataException;
+import org.apache.iotdb.commons.schema.node.role.IDatabaseMNode;
+import
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.CachedMTreeStore;
+import
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.cache.ICacheManager;
+import
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.lock.LockManager;
+import
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.memcontrol.IReleaseFlushStrategy;
+import
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.mnode.ICachedMNode;
+import
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.schemafile.ISchemaFile;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class Scheduler {
+ private static final Logger LOGGER =
LoggerFactory.getLogger(Scheduler.class);
+
+ /** configuration */
+ private int BATCH_FLUSH_SUBTREE = 50;
+
+ private int FLUSH_WORKER_NUM = 10;
+
+ /** data structure */
+ private final Map<Integer, CachedMTreeStore> regionToStore;
+ // flushingRegionSet is used to avoid flush the same region concurrently,
update will be
+ // guaranteed by synchronized
+ private final Set<Integer> flushingRegionSet;
+
+ private final ExecutorService workerPool;
+ private final IReleaseFlushStrategy releaseFlushStrategy;
+
+ public Scheduler(
+ Map<Integer, CachedMTreeStore> regionToStore,
+ Set<Integer> flushingRegionSet,
+ IReleaseFlushStrategy releaseFlushStrategy) {
+ this.regionToStore = regionToStore;
+ // When the thread pool is unable to handle a new task, it simply discards
the task without
+ // doing anything about it.
+ this.workerPool =
+ IoTDBThreadPoolFactory.newFixedThreadPool(
+ FLUSH_WORKER_NUM,
+ ThreadName.PBTREE_FLUSH_PROCESSOR.getName(),
+ new ThreadPoolExecutor.DiscardPolicy());
+ this.flushingRegionSet = flushingRegionSet;
+ this.releaseFlushStrategy = releaseFlushStrategy;
+ }
+
+ /**
+ * Force flush all volatile subtrees and updated database MNodes to disk.
After flushing, the
+ * MNodes will be placed into node cache. This method will return
synchronously after all stores
+ * are successfully flushed.
+ */
+ public synchronized void forceFlushAll() {
+ List<Map.Entry<Integer, CachedMTreeStore>> flushEngineList = new
ArrayList<>();
+ for (Map.Entry<Integer, CachedMTreeStore> entry :
regionToStore.entrySet()) {
+ if (flushingRegionSet.contains(entry.getKey())) {
+ continue;
+ }
+ flushingRegionSet.add(entry.getKey());
+ flushEngineList.add(entry);
+ }
+ CompletableFuture.allOf(
+ flushEngineList.stream()
+ .map(
+ entry ->
+ CompletableFuture.runAsync(
+ () -> {
+ CachedMTreeStore store = entry.getValue();
+ int regionId = entry.getKey();
+ ICacheManager cacheManager =
store.getCacheManager();
+ ISchemaFile file = store.getSchemaFile();
+ LockManager lockManager = store.getLockManager();
+ long startTime = System.currentTimeMillis();
+ PBTreeFlushExecutor flushExecutor;
+ IDatabaseMNode<ICachedMNode> dbNode =
+
cacheManager.collectUpdatedStorageGroupMNodes();
+ if (dbNode != null) {
+ flushExecutor =
+ new PBTreeFlushExecutor(
+ dbNode, cacheManager, file,
lockManager);
+ try {
+ flushExecutor.flushDatabase();
+ } catch (IOException e) {
+ LOGGER.warn(
+ "Error occurred during MTree flush,
current SchemaRegionId is {} because {}",
+ regionId,
+ e.getMessage(),
+ e);
+ }
+ }
+ flushExecutor =
+ new PBTreeFlushExecutor(
+ cacheManager.collectVolatileSubtrees(),
+ cacheManager,
+ file,
+ lockManager);
+ try {
+ flushExecutor.flushVolatileNodes();
+ } catch (MetadataException e) {
+ LOGGER.warn(
+ "Error occurred during MTree flush,
current SchemaRegionId is {} because {}",
+ regionId,
+ e.getMessage(),
+ e);
+ } finally {
+ long time = System.currentTimeMillis() -
startTime;
+ if (time > 10_000) {
+ LOGGER.info(
+ "It takes {}ms to flush MTree in
SchemaRegion {}",
+ time,
+ regionId);
+ } else {
+ LOGGER.debug(
+ "It takes {}ms to flush MTree in
SchemaRegion {}",
+ time,
+ regionId);
+ }
+ flushingRegionSet.remove(regionId);
+ }
+ },
+ workerPool))
+ .toArray(CompletableFuture[]::new))
+ .join();
+ }
+
+ /**
+ * Keep fetching evictable nodes from cacheManager until the memory status
is under safe mode or
+ * no node could be evicted. Update the memory status after evicting each
node.
+ *
+ * @param force true if force to evict all cache
+ */
+ public synchronized void scheduleRelease(boolean force) {
+ CompletableFuture.allOf(
+ regionToStore.values().stream()
+ .map(
+ store ->
+ CompletableFuture.runAsync(
+ () -> {
+ while (force ||
releaseFlushStrategy.isExceedReleaseThreshold()) {
+ // store try to release memory if not exceed
release threshold
+ if (store.executeMemoryRelease()) {
+ // if store can not release memory, break
+ break;
+ }
+ }
+ },
+ workerPool))
+ .toArray(CompletableFuture[]::new))
+ .join();
+ }
+
+ /**
+ * Select some subtrees to flush. The subtrees are selected from the
MTreeStore by the sequence of
+ * the regionIds. The number of subtrees to flush is determined by parameter
{@link
+ * Scheduler#BATCH_FLUSH_SUBTREE}. It will return asynchronously. If worker
pool is full, the task
+ * will be discarded directly.
+ *
+ * @param regionIds determine the MTreeStore to select subtrees, the head of
the list is the first
+ * MTreeStore to select subtrees
+ */
+ public synchronized void scheduleFlush(List<Integer> regionIds) {
+ AtomicInteger remainToFlush = new AtomicInteger(BATCH_FLUSH_SUBTREE);
+ for (int regionId : regionIds) {
+ if (flushingRegionSet.contains(regionId)) {
+ continue;
+ }
+ flushingRegionSet.add(regionId);
+ workerPool.submit(
+ () -> {
+ CachedMTreeStore store = regionToStore.get(regionId);
+ ICacheManager cacheManager = store.getCacheManager();
+ ISchemaFile file = store.getSchemaFile();
+ LockManager lockManager = store.getLockManager();
+ List<ICachedMNode> nodesToFlush = new ArrayList<>();
+ PBTreeFlushExecutor flushExecutor;
+ long startTime = System.currentTimeMillis();
+ try {
+ IDatabaseMNode<ICachedMNode> dbNode =
cacheManager.collectUpdatedStorageGroupMNodes();
+ if (dbNode != null) {
+ flushExecutor = new PBTreeFlushExecutor(dbNode, cacheManager,
file, lockManager);
+ flushExecutor.flushDatabase();
+ remainToFlush.decrementAndGet();
+ }
+ Iterator<ICachedMNode> volatileSubtrees =
cacheManager.collectVolatileSubtrees();
+ while (volatileSubtrees.hasNext()) {
+ nodesToFlush.add(volatileSubtrees.next());
+ if (nodesToFlush.size() > remainToFlush.get()) {
+ break;
+ }
+ }
+ flushExecutor =
+ new PBTreeFlushExecutor(nodesToFlush.iterator(),
cacheManager, file, lockManager);
+ flushExecutor.flushVolatileNodes();
+ } catch (MetadataException | IOException e) {
+ LOGGER.warn(
+ "Error occurred during MTree flush, current SchemaRegionId
is {} because {}",
+ regionId,
+ e.getMessage(),
+ e);
+ } finally {
+ long time = System.currentTimeMillis() - startTime;
+ if (time > 10_000) {
+ LOGGER.info("It takes {}ms to flush MTree in SchemaRegion {}",
time, regionId);
+ } else {
+ LOGGER.debug("It takes {}ms to flush MTree in SchemaRegion
{}", time, regionId);
+ }
+ remainToFlush.addAndGet(-nodesToFlush.size());
+ flushingRegionSet.remove(regionId);
+ }
+ });
+ if (remainToFlush.get() <= 0) {
+ break;
+ }
+ }
+ }
+
+ public int getActiveWorkerNum() {
+ return ((WrappedThreadPoolExecutor) workerPool).getActiveCount();
+ }
+
+ public void clear() {
+ workerPool.shutdown();
+ }
+
+ public boolean isTerminated() {
+ return workerPool.isTerminated();
+ }
+}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/lock/StampedWriterPreferredLock.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/lock/StampedWriterPreferredLock.java
index 83961523af9..fcca70de3e3 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/lock/StampedWriterPreferredLock.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/lock/StampedWriterPreferredLock.java
@@ -91,7 +91,7 @@ public class StampedWriterPreferredLock {
* re-entry within the same thread. Return directly if no thread holds a
write lock ; block and
* wait if another thread holds a write lock.
*
- * @param prior If false, it will also block and * wait if the write lock
waiting queue is not
+ * @param prior If false, it will also block and wait if the write lock
waiting queue is not
* empty.
*/
public void threadReadLock(boolean prior) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/memcontrol/IReleaseFlushStrategy.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/memcontrol/IReleaseFlushStrategy.java
index b9a584074aa..7d40fd3687b 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/memcontrol/IReleaseFlushStrategy.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/memcontrol/IReleaseFlushStrategy.java
@@ -22,7 +22,4 @@ package
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.memcontr
public interface IReleaseFlushStrategy {
/** Check if exceed release threshold */
boolean isExceedReleaseThreshold();
-
- /** Check if exceed flush threshold */
- boolean isExceedFlushThreshold();
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/memcontrol/ReleaseFlushStrategyNumBasedImpl.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/memcontrol/ReleaseFlushStrategyNumBasedImpl.java
index fae84ec10a7..a3d5270a2fe 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/memcontrol/ReleaseFlushStrategyNumBasedImpl.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/memcontrol/ReleaseFlushStrategyNumBasedImpl.java
@@ -38,9 +38,4 @@ public class ReleaseFlushStrategyNumBasedImpl implements
IReleaseFlushStrategy {
return engineStatistics.getPinnedMNodeNum() +
engineStatistics.getUnpinnedMNodeNum()
> capacity * 0.6;
}
-
- @Override
- public boolean isExceedFlushThreshold() {
- return engineStatistics.getPinnedMNodeNum() +
engineStatistics.getUnpinnedMNodeNum() > capacity;
- }
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/memcontrol/ReleaseFlushStrategySizeBasedImpl.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/memcontrol/ReleaseFlushStrategySizeBasedImpl.java
index 266959c1b17..c0be7203f2a 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/memcontrol/ReleaseFlushStrategySizeBasedImpl.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/memcontrol/ReleaseFlushStrategySizeBasedImpl.java
@@ -27,25 +27,16 @@ public class ReleaseFlushStrategySizeBasedImpl implements
IReleaseFlushStrategy
private final CachedSchemaEngineStatistics engineStatistics;
private final long releaseThreshold;
- private final long flushThreshold;
-
- public static final double RELEASE_THRESHOLD_RATIO = 0.6;
- public static final double FLUSH_THRESHOLD_RATION = 0.75;
+ public static final double RELEASE_THRESHOLD_RATIO = 0.70;
public ReleaseFlushStrategySizeBasedImpl(CachedSchemaEngineStatistics
engineStatistics) {
this.engineStatistics = engineStatistics;
long capacity =
IoTDBDescriptor.getInstance().getConfig().getAllocateMemoryForSchemaRegion();
this.releaseThreshold = (long) (capacity * RELEASE_THRESHOLD_RATIO);
- this.flushThreshold = (long) (capacity * FLUSH_THRESHOLD_RATION);
}
@Override
public boolean isExceedReleaseThreshold() {
return engineStatistics.getMemoryUsage() > releaseThreshold;
}
-
- @Override
- public boolean isExceedFlushThreshold() {
- return engineStatistics.getMemoryUsage() > flushThreshold;
- }
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/pagemgr/PageManager.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/pagemgr/PageManager.java
index 015bd38fb06..27a634d6443 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/pagemgr/PageManager.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/pagemgr/PageManager.java
@@ -324,19 +324,14 @@ public abstract class PageManager implements IPageManager
{
if (!child.isMeasurement()) {
alias = null;
- if (getNodeAddress(child) >= 0) {
- // new child with a valid segment address, weird
- throw new MetadataException(
- String.format(
- "A child [%s] in newChildBuffer shall not have
segmentAddress.",
- child.getFullPath()));
- }
-
- // pre-allocate except that child is a device node using template
- if (!(child.isDevice() && child.getAsDeviceMNode().isUseTemplate())) {
+ // TODO optimization if many device only using template but has no
child
+ if (getNodeAddress(child) < 0) {
short estSegSize = estimateSegmentSize(child);
long glbIndex = preAllocateSegment(estSegSize, cxt);
SchemaFile.setNodeAddress(child, glbIndex);
+ } else {
+ // new child with a valid segment address could be maliciously
modified
+ throw new MetadataException("A child in newChildBuffer shall not
have segmentAddress.");
}
} else {
alias =
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/traverser/Traverser.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/traverser/Traverser.java
index b116bc1950b..86b1524bf75 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/traverser/Traverser.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/traverser/Traverser.java
@@ -32,6 +32,7 @@ import
org.apache.iotdb.commons.schema.tree.AbstractTreeVisitor;
import org.apache.iotdb.db.schemaengine.schemaregion.mtree.IMTreeStore;
import
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.mem.mnode.iterator.MNodeIterator;
import
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.ReentrantReadOnlyCachedMTreeStore;
+import
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.cache.ReleaseFlushMonitor;
import org.apache.iotdb.db.schemaengine.schemaregion.utils.MNodeUtils;
import org.apache.iotdb.db.schemaengine.template.Template;
@@ -75,6 +76,8 @@ public abstract class Traverser<R, N extends IMNode<N>>
extends AbstractTreeVisi
protected boolean isPrefixMatch = false;
private IDeviceMNode<N> skipTemplateDevice;
+ private ReleaseFlushMonitor.RecordNode timeRecorder;
+
protected Traverser() {}
/**
@@ -104,6 +107,7 @@ public abstract class Traverser<R, N extends IMNode<N>>
extends AbstractTreeVisi
}
this.startNode = startNode;
this.nodes = nodes;
+ this.timeRecorder = store.recordTraverserStatistics();
}
/**
@@ -120,6 +124,7 @@ public abstract class Traverser<R, N extends IMNode<N>>
extends AbstractTreeVisi
this.store = store.getWithReentrantReadLock();
initStack();
this.startNode = startNode;
+ this.timeRecorder = store.recordTraverserStatistics();
}
/**
@@ -250,9 +255,11 @@ public abstract class Traverser<R, N extends IMNode<N>>
extends AbstractTreeVisi
public void close() {
super.close();
if (store instanceof ReentrantReadOnlyCachedMTreeStore) {
- // TODO update here
((ReentrantReadOnlyCachedMTreeStore) store).unlockRead();
}
+ if (timeRecorder != null) {
+ timeRecorder.setEndTime(System.currentTimeMillis());
+ }
}
public void setTemplateMap(Map<Integer, Template> templateMap,
IMNodeFactory<N> nodeFactory) {
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/metadata/mtree/schemafile/MonitorTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/metadata/mtree/schemafile/MonitorTest.java
new file mode 100644
index 00000000000..cade004d72d
--- /dev/null
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/metadata/mtree/schemafile/MonitorTest.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.metadata.mtree.schemafile;
+
+import
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.cache.ReleaseFlushMonitor;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+public class MonitorTest {
+ private ReleaseFlushMonitor releaseFlushMonitor;
+
+ @Before
+ public void setUp() {
+ releaseFlushMonitor = ReleaseFlushMonitor.getInstance();
+ }
+
+ @After
+ public void tearDown() throws IOException {
+ releaseFlushMonitor.clear();
+ }
+
+ @Test
+ public void testGetRegionsToFlush() {
+ // free = 500
+ setRecord(
+ 1, Arrays.asList(0L, 2L, 3L, 3000L, 4000L), Arrays.asList(100L, 2500L,
200L, 5000L, 6000L));
+ // free = 2000
+ setRecord(
+ 2, Arrays.asList(0L, 2L, 3L, 3000L, 4000L), Arrays.asList(100L, 1000L,
200L, 5500L, 6000L));
+ // free =700
+ setRecord(3, Arrays.asList(700L, 800L), Arrays.asList(900L, 6000L));
+ // free =1700
+ setRecord(4, Arrays.asList(700L, 800L, 2500L), Arrays.asList(1000L, 1500L,
5000L));
+ // free =2100
+ setRecord(5, Arrays.asList(0L, 2000L), Arrays.asList(1000L, 3900L));
+ List<Integer> regions = releaseFlushMonitor.getRegionsToFlush(5000);
+ Assert.assertEquals(3, regions.size());
+ Assert.assertEquals(5, regions.get(0).intValue());
+ Assert.assertEquals(2, regions.get(1).intValue());
+ Assert.assertEquals(4, regions.get(2).intValue());
+ }
+
+ @Test
+ public void testGetRegionsToFlush2() {
+ setRecord(1, Arrays.asList(0L, 2000L), Arrays.asList(100L, 7000L));
+ setRecord(2, Collections.singletonList(3000L),
Collections.singletonList(3500L));
+ List<Integer> regions = releaseFlushMonitor.getRegionsToFlush(7000);
+ Assert.assertEquals(1, regions.size());
+ Assert.assertEquals(2, regions.get(0).intValue());
+ }
+
+ private void setRecord(int regionId, List<Long> startTimes, List<Long>
eneTimes) {
+ releaseFlushMonitor.initRecordList(regionId);
+ for (int i = 0; i < startTimes.size(); i++) {
+ ReleaseFlushMonitor.RecordNode node =
releaseFlushMonitor.recordTraverserTime(regionId);
+ node.setStartTime(startTimes.get(i));
+ node.setEndTime(eneTimes.get(i));
+ }
+ }
+}
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/metadata/schemaRegion/SchemaStatisticsTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/metadata/schemaRegion/SchemaStatisticsTest.java
index bef220b7c3a..7a07e03172a 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/metadata/schemaRegion/SchemaStatisticsTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/metadata/schemaRegion/SchemaStatisticsTest.java
@@ -28,7 +28,7 @@ import
org.apache.iotdb.db.schemaengine.rescon.CachedSchemaEngineStatistics;
import org.apache.iotdb.db.schemaengine.rescon.CachedSchemaRegionStatistics;
import org.apache.iotdb.db.schemaengine.rescon.ISchemaEngineStatistics;
import org.apache.iotdb.db.schemaengine.schemaregion.ISchemaRegion;
-import
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.cache.CacheMemoryManager;
+import
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.cache.ReleaseFlushMonitor;
import
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.mnode.ICachedMNode;
import
org.apache.iotdb.db.schemaengine.schemaregion.mtree.loader.MNodeFactoryLoader;
import
org.apache.iotdb.db.schemaengine.schemaregion.write.req.SchemaRegionWritePlanFactory;
@@ -76,7 +76,7 @@ public class SchemaStatisticsTest extends
AbstractSchemaRegionTest {
IMNodeFactory<ICachedMNode> nodeFactory =
MNodeFactoryLoader.getInstance().getCachedMNodeIMNodeFactory();
// wait release and flush task
- Thread.sleep(1000);
+ Thread.sleep(6000);
// schemaRegion1
IMNode<ICachedMNode> sg1 =
nodeFactory.createDatabaseMNode(
@@ -91,8 +91,7 @@ public class SchemaStatisticsTest extends
AbstractSchemaRegionTest {
Assert.assertEquals(
size1 + nodeFactory.createDeviceMNode(sg1.getAsMNode(),
"n").estimateSize(),
schemaRegion1.getSchemaRegionStatistics().getRegionMemoryUsage());
- CacheMemoryManager.getInstance().forceFlushAndRelease();
- Thread.sleep(1000);
+ ReleaseFlushMonitor.getInstance().forceFlushAndRelease();
Assert.assertEquals(
size1,
schemaRegion1.getSchemaRegionStatistics().getRegionMemoryUsage());
}
@@ -267,7 +266,7 @@ public class SchemaStatisticsTest extends
AbstractSchemaRegionTest {
Assert.assertTrue(
d0ExistSize ==
schemaRegion1.getSchemaRegionStatistics().getRegionMemoryUsage()
|| d1ExistSize ==
schemaRegion1.getSchemaRegionStatistics().getRegionMemoryUsage());
- CacheMemoryManager.getInstance().forceFlushAndRelease();
+ ReleaseFlushMonitor.getInstance().forceFlushAndRelease();
// wait release and flush task
Thread.sleep(1000);
Assert.assertEquals(
@@ -453,7 +452,7 @@ public class SchemaStatisticsTest extends
AbstractSchemaRegionTest {
if (0 != cachedRegionStatistics1.getUnpinnedMNodeNum()) {
// "d0" may remain in PartialMemory mode
Assert.assertEquals("PBTree-PartialMemory",
testParams.getTestModeName());
- CacheMemoryManager.getInstance().forceFlushAndRelease();
+ ReleaseFlushMonitor.getInstance().forceFlushAndRelease();
Thread.sleep(1000);
Assert.assertEquals(0,
cachedRegionStatistics1.getUnpinnedMNodeNum());
}
diff --git a/iotdb-core/datanode/src/test/resources/logback-test.xml
b/iotdb-core/datanode/src/test/resources/logback-test.xml
index 8692100d47d..dc90663f6cd 100644
--- a/iotdb-core/datanode/src/test/resources/logback-test.xml
+++ b/iotdb-core/datanode/src/test/resources/logback-test.xml
@@ -44,6 +44,7 @@
<logger name="org.apache.iotdb.db.sync" level="INFO"/>
<logger name="org.apache.iotdb.db.storageengine.merge" level="INFO"/>
<logger name="org.apache.iotdb.db.metadata" level="INFO"/>
+ <logger name="org.apache.iotdb.db.schemaengine" level="INFO"/>
<logger name="org.apache.iotdb.commons.service.ThriftServiceThread"
level="INFO"/>
<logger name="org.eclipse.jetty.util.thread.QueuedThreadPool"
level="INFO"/>
<logger name="org.apache.iotdb.commons.service.metric.MetricService"
level="INFO"/>
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/IoTDBThreadPoolFactory.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/IoTDBThreadPoolFactory.java
index 54ad69bbb9c..343711f0bab 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/IoTDBThreadPoolFactory.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/IoTDBThreadPoolFactory.java
@@ -31,6 +31,7 @@ import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.SynchronousQueue;
@@ -79,6 +80,27 @@ public class IoTDBThreadPoolFactory {
poolName);
}
+ /**
+ * see {@link Executors#newFixedThreadPool(int,
java.util.concurrent.ThreadFactory)}.
+ *
+ * @param poolName - the name of thread pool
+ * @return fixed size thread pool
+ */
+ public static ExecutorService newFixedThreadPool(
+ int nThreads, String poolName, RejectedExecutionHandler handler) {
+ logger.info(NEW_FIXED_THREAD_POOL_LOGGER_FORMAT, poolName, nThreads);
+
+ return new WrappedThreadPoolExecutor(
+ nThreads,
+ nThreads,
+ 0L,
+ TimeUnit.MILLISECONDS,
+ new LinkedBlockingQueue<>(),
+ new IoTThreadFactory(poolName),
+ poolName,
+ handler);
+ }
+
public static ExecutorService newFixedThreadPoolWithDaemonThread(int
nThreads, String poolName) {
logger.info(NEW_FIXED_THREAD_POOL_LOGGER_FORMAT, poolName, nThreads);
return new WrappedSingleThreadExecutorService(
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java
index bd6c8097f1e..dbc26565b85 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java
@@ -65,10 +65,10 @@ public enum ThreadName {
// -------------------------- SchemaEngine --------------------------
SCHEMA_REGION_RELEASE_PROCESSOR("SchemaRegion-Release-Task-Processor"),
SCHEMA_REGION_RECOVER_TASK("SchemaRegion-recover-task"),
- SCHEMA_RELEASE_MONITOR("Schema-Release-Task-Monitor"),
- SCHEMA_REGION_FLUSH_PROCESSOR("SchemaRegion-Flush-Task-Processor"),
- SCHEMA_FLUSH_MONITOR("Schema-Flush-Task-Monitor"),
SCHEMA_FORCE_MLOG("SchemaEngine-TimedForceMLog-Thread"),
+ PBTREE_RELEASE_MONITOR("PBTree-Release-Task-Monitor"),
+ PBTREE_FLUSH_MONITOR("PBTree-Flush-Monitor"),
+ PBTREE_FLUSH_PROCESSOR("PBTree-Flush-Processor"),
// -------------------------- ClientService --------------------------
CLIENT_RPC_SERVICE("ClientRPC-Service"),
CLIENT_RPC_PROCESSOR("ClientRPC-Processor"),
@@ -220,10 +220,10 @@ public enum ThreadName {
Arrays.asList(
SCHEMA_REGION_RELEASE_PROCESSOR,
SCHEMA_REGION_RECOVER_TASK,
- SCHEMA_RELEASE_MONITOR,
- SCHEMA_REGION_FLUSH_PROCESSOR,
- SCHEMA_FLUSH_MONITOR,
- SCHEMA_FORCE_MLOG));
+ PBTREE_RELEASE_MONITOR,
+ SCHEMA_FORCE_MLOG,
+ PBTREE_FLUSH_MONITOR,
+ PBTREE_FLUSH_PROCESSOR));
private static final Set<ThreadName> clientServiceThreadNames =
new HashSet<>(Arrays.asList(CLIENT_RPC_SERVICE, CLIENT_RPC_PROCESSOR));
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/threadpool/WrappedThreadPoolExecutor.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/threadpool/WrappedThreadPoolExecutor.java
index d81c97f8454..49f914f5c47 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/threadpool/WrappedThreadPoolExecutor.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/threadpool/WrappedThreadPoolExecutor.java
@@ -32,6 +32,7 @@ import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
+import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
@@ -73,6 +74,23 @@ public class WrappedThreadPoolExecutor extends
ThreadPoolExecutor
ThreadPoolMetrics.getInstance().registerThreadPool(this, this.mbeanName);
}
+ public WrappedThreadPoolExecutor(
+ int corePoolSize,
+ int maximumPoolSize,
+ long keepAliveTime,
+ TimeUnit unit,
+ BlockingQueue<Runnable> workQueue,
+ IoTThreadFactory ioTThreadFactory,
+ String mbeanName,
+ RejectedExecutionHandler handler) {
+ super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
ioTThreadFactory, handler);
+ this.mbeanName =
+ String.format(
+ "%s:%s=%s", IoTDBConstant.IOTDB_THREADPOOL_JMX_NAME,
IoTDBConstant.JMX_TYPE, mbeanName);
+ JMXService.registerMBean(this, this.mbeanName);
+ ThreadPoolMetrics.getInstance().registerThreadPool(this, this.mbeanName);
+ }
+
@Override
public void shutdown() {
super.shutdown();