This is an automated email from the ASF dual-hosted git repository.
chenyz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 020b8a52885 Extract common code and fix PBTree UT (#11890)
020b8a52885 is described below
commit 020b8a52885e8bc98db728291f1cd46f437484b1
Author: Chen YZ <[email protected]>
AuthorDate: Fri Jan 12 18:11:55 2024 +0800
Extract common code and fix PBTree UT (#11890)
---
.../mtree/impl/pbtree/flush/Scheduler.java | 150 ++++++++++-----------
.../impl/pbtree/memory/ReleaseFlushMonitor.java | 4 +-
.../schemaRegion/SchemaStatisticsTest.java | 2 +-
3 files changed, 73 insertions(+), 83 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/flush/Scheduler.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/flush/Scheduler.java
index 579db96c5e8..5a22b23492b 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/flush/Scheduler.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/flush/Scheduler.java
@@ -75,12 +75,60 @@ public class Scheduler {
this.releaseFlushStrategy = releaseFlushStrategy;
}
+ private void executeFlush(CachedMTreeStore store, int regionId,
AtomicInteger remainToFlush) {
+ IMemoryManager memoryManager = store.getMemoryManager();
+ ISchemaFile file = store.getSchemaFile();
+ LockManager lockManager = store.getLockManager();
+ long startTime = System.currentTimeMillis();
+ AtomicLong flushNodeNum = new AtomicLong(0);
+ AtomicLong flushMemSize = new AtomicLong(0);
+ try {
+ PBTreeFlushExecutor flushExecutor;
+ if (remainToFlush == null) {
+ flushExecutor = new PBTreeFlushExecutor(memoryManager, file,
lockManager);
+ } else {
+ flushExecutor = new PBTreeFlushExecutor(remainToFlush, memoryManager,
file, lockManager);
+ }
+ flushExecutor.flushVolatileNodes(flushNodeNum, flushMemSize);
+ } catch (MetadataException e) {
+ LOGGER.warn(
+ "Error occurred during MTree flush, current SchemaRegionId is {}
because {}",
+ regionId,
+ e.getMessage(),
+ e);
+ } finally {
+ long time = System.currentTimeMillis() - startTime;
+ if (time > 10_000) {
+ LOGGER.info("It takes {}ms to flush MTree in SchemaRegion {}", time,
regionId);
+ } else {
+ LOGGER.debug("It takes {}ms to flush MTree in SchemaRegion {}", time,
regionId);
+ }
+ store.recordFlushMetrics(time, flushNodeNum.get(), flushMemSize.get());
+ flushingRegionSet.remove(regionId);
+ }
+ }
+
+ private void executeRelease(CachedMTreeStore store, boolean force) {
+ AtomicLong releaseNodeNum = new AtomicLong(0);
+ AtomicLong releaseMemorySize = new AtomicLong(0);
+ long startTime = System.currentTimeMillis();
+ while (force || releaseFlushStrategy.isExceedReleaseThreshold()) {
+ // store try to release memory if not exceed release threshold
+ if (store.executeMemoryRelease(releaseNodeNum, releaseMemorySize)) {
+ // if store can not release memory, break
+ break;
+ }
+ }
+ store.recordReleaseMetrics(
+ System.currentTimeMillis() - startTime, releaseNodeNum.get(),
releaseMemorySize.get());
+ }
+
/**
* Force flush all volatile subtrees and updated database MNodes to disk.
After flushing, the
* MNodes will be placed into node cache. This method will return
synchronously after all stores
* are successfully flushed.
*/
- public synchronized CompletableFuture<Void> forceFlushAll() {
+ public synchronized CompletableFuture<Void> scheduleFlushAll() {
List<Map.Entry<Integer, CachedMTreeStore>> flushEngineList = new
ArrayList<>();
for (Map.Entry<Integer, CachedMTreeStore> entry :
regionToStore.entrySet()) {
if (flushingRegionSet.contains(entry.getKey())) {
@@ -101,43 +149,17 @@ public class Scheduler {
// store has been closed
return;
}
- IMemoryManager memoryManager =
store.getMemoryManager();
- ISchemaFile file = store.getSchemaFile();
LockManager lockManager = store.getLockManager();
- long startTime = System.currentTimeMillis();
- AtomicLong flushNodeNum = new AtomicLong(0);
- AtomicLong flushMemSize = new AtomicLong(0);
+ lockManager.globalReadLock();
+ if (!regionToStore.containsKey(regionId)) {
+ // double check store have not been closed
+ return;
+ }
try {
- lockManager.globalReadLock();
- if (!regionToStore.containsKey(regionId)) {
- // double check store have not been closed
- return;
- }
- PBTreeFlushExecutor flushExecutor =
- new PBTreeFlushExecutor(memoryManager, file,
lockManager);
- flushExecutor.flushVolatileNodes(flushNodeNum,
flushMemSize);
- } catch (MetadataException e) {
- LOGGER.warn(
- "Error occurred during MTree flush, current
SchemaRegionId is {} because {}",
- regionId,
- e.getMessage(),
- e);
+ executeFlush(store, regionId, null);
+ executeRelease(store, false);
} finally {
- long time = System.currentTimeMillis() - startTime;
- if (time > 10_000) {
- LOGGER.info(
- "It takes {}ms to flush MTree in
SchemaRegion {}",
- time,
- regionId);
- } else {
- LOGGER.debug(
- "It takes {}ms to flush MTree in
SchemaRegion {}",
- time,
- regionId);
- }
- store.recordFlushMetrics(time, flushNodeNum.get(),
flushMemSize.get());
lockManager.globalReadUnlock();
- flushingRegionSet.remove(regionId);
}
},
workerPool))
@@ -159,28 +181,18 @@ public class Scheduler {
() -> {
int regionId = entry.getKey();
CachedMTreeStore store = entry.getValue();
+ if (store == null) {
+ // store has been closed
+ return;
+ }
+ LockManager lockManager = store.getLockManager();
+ lockManager.globalReadLock(true);
if (!regionToStore.containsKey(regionId)) {
// double check store have not been closed
return;
}
- LockManager lockManager = store.getLockManager();
try {
- lockManager.globalReadLock(true);
- AtomicLong releaseNodeNum = new AtomicLong(0);
- AtomicLong releaseMemorySize = new
AtomicLong(0);
- long startTime = System.currentTimeMillis();
- while (force ||
releaseFlushStrategy.isExceedReleaseThreshold()) {
- // store try to release memory if not exceed
release threshold
- if (store.executeMemoryRelease(
- releaseNodeNum, releaseMemorySize)) {
- // if store can not release memory, break
- break;
- }
- }
- store.recordReleaseMetrics(
- System.currentTimeMillis() - startTime,
- releaseNodeNum.get(),
- releaseMemorySize.get());
+ executeRelease(store, force);
} finally {
lockManager.globalReadUnlock();
}
@@ -213,39 +225,17 @@ public class Scheduler {
// store has been closed
return;
}
- IMemoryManager memoryManager = store.getMemoryManager();
- ISchemaFile file = store.getSchemaFile();
LockManager lockManager = store.getLockManager();
- long startTime = System.currentTimeMillis();
- AtomicLong flushNodeNum = new AtomicLong(0);
- AtomicLong flushMemSize = new AtomicLong(0);
+ lockManager.globalReadLock();
+ if (!regionToStore.containsKey(regionId)) {
+ // double check store have not been closed
+ return;
+ }
try {
- lockManager.globalReadLock();
- if (!regionToStore.containsKey(regionId)) {
- // double check store have not been closed
- return;
- }
- PBTreeFlushExecutor flushExecutor =
- new PBTreeFlushExecutor(remainToFlush, memoryManager, file,
lockManager);
- flushExecutor.flushVolatileNodes(flushNodeNum, flushMemSize);
- } catch (MetadataException e) {
- LOGGER.warn(
- "Error occurred during MTree flush, current SchemaRegionId
is {} because {}",
- regionId,
- e.getMessage(),
- e);
- } catch (Throwable e) {
- LOGGER.error("Error occurred during PBTree flush", e);
+
+ executeFlush(store, regionId, remainToFlush);
} finally {
- long time = System.currentTimeMillis() - startTime;
- if (time > 10_000) {
- LOGGER.info("It takes {}ms to flush MTree in SchemaRegion {}",
time, regionId);
- } else {
- LOGGER.debug("It takes {}ms to flush MTree in SchemaRegion
{}", time, regionId);
- }
- store.recordFlushMetrics(time, flushNodeNum.get(),
flushMemSize.get());
lockManager.globalReadUnlock();
- flushingRegionSet.remove(regionId);
}
});
if (remainToFlush.get() <= 0) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/memory/ReleaseFlushMonitor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/memory/ReleaseFlushMonitor.java
index cf367416968..81dbd816ddb 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/memory/ReleaseFlushMonitor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/memory/ReleaseFlushMonitor.java
@@ -119,7 +119,7 @@ public class ReleaseFlushMonitor {
// 2. if it still exceeds release threshold, it will try to
flush node buffer, then
// release node cache again
if (releaseFlushStrategy.isExceedReleaseThreshold()) {
- scheduler.forceFlushAll();
+ scheduler.scheduleFlushAll();
regionToTraverserTime.values().forEach(RecordList::clear);
}
synchronized (blockObject) {
@@ -238,7 +238,7 @@ public class ReleaseFlushMonitor {
}
}
if (needFlush) {
- scheduler.forceFlushAll().join();
+ scheduler.scheduleFlushAll().join();
scheduler.scheduleRelease(true);
} else {
break;
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/metadata/schemaRegion/SchemaStatisticsTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/metadata/schemaRegion/SchemaStatisticsTest.java
index da8d88b34c3..5b7c4dd6589 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/metadata/schemaRegion/SchemaStatisticsTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/metadata/schemaRegion/SchemaStatisticsTest.java
@@ -124,7 +124,7 @@ public class SchemaStatisticsTest extends
AbstractSchemaRegionTest {
IMNodeFactory<?> nodeFactory =
MNodeFactoryLoader.getInstance().getCachedMNodeIMNodeFactory();
// wait release and flush task
- Thread.sleep(6500);
+ Thread.sleep(1000);
// schemaRegion1
IMNode<?> sg1 =
nodeFactory.createDatabaseMNode(