This is an automated email from the ASF dual-hosted git repository.
chenyz pushed a commit to branch pbtree_concurrent
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/pbtree_concurrent by this push:
new e0f961f5069 Ensure PBTree DROP and FLUSH concurrently (#11731)
e0f961f5069 is described below
commit e0f961f506961519ca4a4a0b2980888cfa51c3d9
Author: Chen YZ <[email protected]>
AuthorDate: Mon Dec 18 10:54:25 2023 +0800
Ensure PBTree DROP and FLUSH concurrently (#11731)
* done
* release add read lock
* Encapsulate limit logic into PBTreeFlushExecutor
---
.../mtree/impl/pbtree/CachedMTreeStore.java | 27 ++++------
.../impl/pbtree/flush/PBTreeFlushExecutor.java | 36 ++++++++-----
.../mtree/impl/pbtree/flush/Scheduler.java | 63 ++++++----------------
.../iotdb/commons/concurrent/ThreadName.java | 6 +--
4 files changed, 53 insertions(+), 79 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/CachedMTreeStore.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/CachedMTreeStore.java
index 708173bca5a..4136f80942e 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/CachedMTreeStore.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/CachedMTreeStore.java
@@ -21,7 +21,6 @@ package
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree;
import org.apache.iotdb.commons.exception.MetadataException;
import org.apache.iotdb.commons.path.PartialPath;
-import org.apache.iotdb.commons.schema.node.role.IDatabaseMNode;
import org.apache.iotdb.commons.schema.node.role.IDeviceMNode;
import org.apache.iotdb.commons.schema.node.role.IMeasurementMNode;
import org.apache.iotdb.commons.schema.node.utils.IMNodeFactory;
@@ -610,10 +609,15 @@ public class CachedMTreeStore implements
IMTreeStore<ICachedMNode> {
* @return should not continue releasing
*/
public boolean executeMemoryRelease() {
- if (regionStatistics.getUnpinnedMemorySize() != 0) {
- return !cacheManager.evict();
- } else {
- return true;
+ lockManager.globalReadUnlock();
+ try {
+ if (regionStatistics.getUnpinnedMemorySize() != 0) {
+ return !cacheManager.evict();
+ } else {
+ return true;
+ }
+ } finally {
+ lockManager.globalReadUnlock();
}
}
@@ -623,19 +627,8 @@ public class CachedMTreeStore implements
IMTreeStore<ICachedMNode> {
lockManager.globalReadLock();
}
try {
- PBTreeFlushExecutor flushExecutor;
- IDatabaseMNode<ICachedMNode> updatedStorageGroupMNode =
- cacheManager.collectUpdatedStorageGroupMNodes();
- if (updatedStorageGroupMNode != null) {
- flushExecutor =
- new PBTreeFlushExecutor(updatedStorageGroupMNode, cacheManager,
file, lockManager);
- flushExecutor.flushDatabase();
- }
-
- Iterator<ICachedMNode> volatileSubtrees =
cacheManager.collectVolatileSubtrees();
-
+ PBTreeFlushExecutor flushExecutor = new
PBTreeFlushExecutor(cacheManager, file, lockManager);
long startTime = System.currentTimeMillis();
- flushExecutor = new PBTreeFlushExecutor(volatileSubtrees, cacheManager,
file, lockManager);
flushExecutor.flushVolatileNodes();
long time = System.currentTimeMillis() - startTime;
if (time > 10_000) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/flush/PBTreeFlushExecutor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/flush/PBTreeFlushExecutor.java
index b1b59b630c7..b7526574fb5 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/flush/PBTreeFlushExecutor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/flush/PBTreeFlushExecutor.java
@@ -32,38 +32,41 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.ArrayDeque;
import java.util.ArrayList;
-import java.util.Collections;
import java.util.Deque;
import java.util.Iterator;
import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
public class PBTreeFlushExecutor {
private static final Logger logger =
LoggerFactory.getLogger(PBTreeFlushExecutor.class);
private final Iterator<ICachedMNode> subtreeRoots;
+ private final IDatabaseMNode<ICachedMNode> databaseMNode;
+ private final AtomicInteger remainToFlush;
private final ICacheManager cacheManager;
private final ISchemaFile file;
private final LockManager lockManager;
public PBTreeFlushExecutor(
- Iterator<ICachedMNode> subtreeRoots,
- ICacheManager cacheManager,
- ISchemaFile file,
- LockManager lockManager) {
- this.subtreeRoots = subtreeRoots;
+ ICacheManager cacheManager, ISchemaFile file, LockManager lockManager) {
+ this.remainToFlush = null;
+ this.subtreeRoots = cacheManager.collectVolatileSubtrees();
+ this.databaseMNode = cacheManager.collectUpdatedStorageGroupMNodes();
this.cacheManager = cacheManager;
this.file = file;
this.lockManager = lockManager;
}
public PBTreeFlushExecutor(
- IDatabaseMNode<ICachedMNode> databaseMNode,
+ AtomicInteger remainToFlush,
ICacheManager cacheManager,
ISchemaFile file,
LockManager lockManager) {
- this.subtreeRoots =
Collections.singletonList(databaseMNode.getAsMNode()).iterator();
+ this.remainToFlush = remainToFlush;
+ this.subtreeRoots = cacheManager.collectVolatileSubtrees();
+ this.databaseMNode = cacheManager.collectUpdatedStorageGroupMNodes();
this.cacheManager = cacheManager;
this.file = file;
this.lockManager = lockManager;
@@ -71,7 +74,14 @@ public class PBTreeFlushExecutor {
public void flushVolatileNodes() throws MetadataException {
List<Exception> exceptions = new ArrayList<>();
- while (subtreeRoots.hasNext()) {
+ if (databaseMNode != null && checkRemainToFlush()) {
+ try {
+ processFlushDatabase(databaseMNode);
+ } catch (Exception e) {
+ exceptions.add(e);
+ }
+ }
+ while (subtreeRoots.hasNext() && checkRemainToFlush()) {
try {
processFlushNonDatabase(subtreeRoots.next());
} catch (Exception e) {
@@ -84,11 +94,11 @@ public class PBTreeFlushExecutor {
}
}
- public void flushDatabase() throws IOException {
- while (subtreeRoots.hasNext()) {
- ICachedMNode subtreeRoot = subtreeRoots.next();
- processFlushDatabase(subtreeRoot.getAsDatabaseMNode());
+ private boolean checkRemainToFlush() {
+ if (remainToFlush == null) {
+ return true;
}
+ return remainToFlush.decrementAndGet() >= 0;
}
private void processFlushDatabase(IDatabaseMNode<ICachedMNode>
updatedStorageGroupMNode)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/flush/Scheduler.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/flush/Scheduler.java
index 0ef7a95d3c3..bf387b1ea64 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/flush/Scheduler.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/flush/Scheduler.java
@@ -23,20 +23,16 @@ import
org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
import org.apache.iotdb.commons.concurrent.ThreadName;
import
org.apache.iotdb.commons.concurrent.threadpool.WrappedThreadPoolExecutor;
import org.apache.iotdb.commons.exception.MetadataException;
-import org.apache.iotdb.commons.schema.node.role.IDatabaseMNode;
import
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.CachedMTreeStore;
import
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.cache.ICacheManager;
import
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.lock.LockManager;
import
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.memcontrol.IReleaseFlushStrategy;
-import
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.mnode.ICachedMNode;
import
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.schemafile.ISchemaFile;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.IOException;
import java.util.ArrayList;
-import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -72,7 +68,7 @@ public class Scheduler {
this.workerPool =
IoTDBThreadPoolFactory.newFixedThreadPool(
FLUSH_WORKER_NUM,
- ThreadName.PBTREE_FLUSH_PROCESSOR.getName(),
+ ThreadName.PBTREE_WORKER_POOL.getName(),
new ThreadPoolExecutor.DiscardPolicy());
this.flushingRegionSet = flushingRegionSet;
this.releaseFlushStrategy = releaseFlushStrategy;
@@ -104,30 +100,14 @@ public class Scheduler {
ISchemaFile file = store.getSchemaFile();
LockManager lockManager = store.getLockManager();
long startTime = System.currentTimeMillis();
- PBTreeFlushExecutor flushExecutor;
- IDatabaseMNode<ICachedMNode> dbNode =
-
cacheManager.collectUpdatedStorageGroupMNodes();
- if (dbNode != null) {
- flushExecutor =
- new PBTreeFlushExecutor(
- dbNode, cacheManager, file,
lockManager);
- try {
- flushExecutor.flushDatabase();
- } catch (IOException e) {
- LOGGER.warn(
- "Error occurred during MTree flush,
current SchemaRegionId is {} because {}",
- regionId,
- e.getMessage(),
- e);
- }
- }
- flushExecutor =
- new PBTreeFlushExecutor(
- cacheManager.collectVolatileSubtrees(),
- cacheManager,
- file,
- lockManager);
try {
+ lockManager.globalReadLock();
+ if (file == null) {
+ // store has been closed
+ return;
+ }
+ PBTreeFlushExecutor flushExecutor =
+ new PBTreeFlushExecutor(cacheManager,
file, lockManager);
flushExecutor.flushVolatileNodes();
} catch (MetadataException e) {
LOGGER.warn(
@@ -148,6 +128,7 @@ public class Scheduler {
time,
regionId);
}
+ lockManager.globalReadUnlock();
flushingRegionSet.remove(regionId);
}
},
@@ -204,27 +185,17 @@ public class Scheduler {
ICacheManager cacheManager = store.getCacheManager();
ISchemaFile file = store.getSchemaFile();
LockManager lockManager = store.getLockManager();
- List<ICachedMNode> nodesToFlush = new ArrayList<>();
- PBTreeFlushExecutor flushExecutor;
long startTime = System.currentTimeMillis();
try {
- IDatabaseMNode<ICachedMNode> dbNode =
cacheManager.collectUpdatedStorageGroupMNodes();
- if (dbNode != null) {
- flushExecutor = new PBTreeFlushExecutor(dbNode, cacheManager,
file, lockManager);
- flushExecutor.flushDatabase();
- remainToFlush.decrementAndGet();
- }
- Iterator<ICachedMNode> volatileSubtrees =
cacheManager.collectVolatileSubtrees();
- while (volatileSubtrees.hasNext()) {
- nodesToFlush.add(volatileSubtrees.next());
- if (nodesToFlush.size() > remainToFlush.get()) {
- break;
- }
+ lockManager.globalReadLock();
+ if (file == null) {
+ // store has been closed
+ return;
}
- flushExecutor =
- new PBTreeFlushExecutor(nodesToFlush.iterator(),
cacheManager, file, lockManager);
+ PBTreeFlushExecutor flushExecutor =
+ new PBTreeFlushExecutor(remainToFlush, cacheManager, file,
lockManager);
flushExecutor.flushVolatileNodes();
- } catch (MetadataException | IOException e) {
+ } catch (MetadataException e) {
LOGGER.warn(
"Error occurred during MTree flush, current SchemaRegionId
is {} because {}",
regionId,
@@ -237,7 +208,7 @@ public class Scheduler {
} else {
LOGGER.debug("It takes {}ms to flush MTree in SchemaRegion
{}", time, regionId);
}
- remainToFlush.addAndGet(-nodesToFlush.size());
+ lockManager.globalReadUnlock();
flushingRegionSet.remove(regionId);
}
});
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java
index dbc26565b85..423e3c57b0c 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java
@@ -64,11 +64,11 @@ public enum ThreadName {
TIMED_FLUSH_UNSEQ_MEMTABLE("Timed-Flush-Unseq-Memtable"),
// -------------------------- SchemaEngine --------------------------
SCHEMA_REGION_RELEASE_PROCESSOR("SchemaRegion-Release-Task-Processor"),
- SCHEMA_REGION_RECOVER_TASK("SchemaRegion-recover-task"),
+ SCHEMA_REGION_RECOVER_TASK("SchemaRegion-Recover-Task"),
SCHEMA_FORCE_MLOG("SchemaEngine-TimedForceMLog-Thread"),
PBTREE_RELEASE_MONITOR("PBTree-Release-Task-Monitor"),
PBTREE_FLUSH_MONITOR("PBTree-Flush-Monitor"),
- PBTREE_FLUSH_PROCESSOR("PBTree-Flush-Processor"),
+ PBTREE_WORKER_POOL("PBTree-Worker-Pool"),
// -------------------------- ClientService --------------------------
CLIENT_RPC_SERVICE("ClientRPC-Service"),
CLIENT_RPC_PROCESSOR("ClientRPC-Processor"),
@@ -223,7 +223,7 @@ public enum ThreadName {
PBTREE_RELEASE_MONITOR,
SCHEMA_FORCE_MLOG,
PBTREE_FLUSH_MONITOR,
- PBTREE_FLUSH_PROCESSOR));
+ PBTREE_WORKER_POOL));
private static final Set<ThreadName> clientServiceThreadNames =
new HashSet<>(Arrays.asList(CLIENT_RPC_SERVICE, CLIENT_RPC_PROCESSOR));