This is an automated email from the ASF dual-hosted git repository.

chenyz pushed a commit to branch pbtree_concurrent
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/pbtree_concurrent by this push:
     new e0f961f5069 Ensure PBTree DROP and FLUSH concurrently (#11731)
e0f961f5069 is described below

commit e0f961f506961519ca4a4a0b2980888cfa51c3d9
Author: Chen YZ <[email protected]>
AuthorDate: Mon Dec 18 10:54:25 2023 +0800

    Ensure PBTree DROP and FLUSH concurrently (#11731)
    
    * done
    
    * release add read lock
    
    * Encapsulate limit logic into PBTreeFlushExecutor
---
 .../mtree/impl/pbtree/CachedMTreeStore.java        | 27 ++++------
 .../impl/pbtree/flush/PBTreeFlushExecutor.java     | 36 ++++++++-----
 .../mtree/impl/pbtree/flush/Scheduler.java         | 63 ++++++----------------
 .../iotdb/commons/concurrent/ThreadName.java       |  6 +--
 4 files changed, 53 insertions(+), 79 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/CachedMTreeStore.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/CachedMTreeStore.java
index 708173bca5a..4136f80942e 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/CachedMTreeStore.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/CachedMTreeStore.java
@@ -21,7 +21,6 @@ package 
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree;
 
 import org.apache.iotdb.commons.exception.MetadataException;
 import org.apache.iotdb.commons.path.PartialPath;
-import org.apache.iotdb.commons.schema.node.role.IDatabaseMNode;
 import org.apache.iotdb.commons.schema.node.role.IDeviceMNode;
 import org.apache.iotdb.commons.schema.node.role.IMeasurementMNode;
 import org.apache.iotdb.commons.schema.node.utils.IMNodeFactory;
@@ -610,10 +609,15 @@ public class CachedMTreeStore implements 
IMTreeStore<ICachedMNode> {
    * @return should not continue releasing
    */
   public boolean executeMemoryRelease() {
-    if (regionStatistics.getUnpinnedMemorySize() != 0) {
-      return !cacheManager.evict();
-    } else {
-      return true;
+    lockManager.globalReadUnlock();
+    try {
+      if (regionStatistics.getUnpinnedMemorySize() != 0) {
+        return !cacheManager.evict();
+      } else {
+        return true;
+      }
+    } finally {
+      lockManager.globalReadUnlock();
     }
   }
 
@@ -623,19 +627,8 @@ public class CachedMTreeStore implements 
IMTreeStore<ICachedMNode> {
       lockManager.globalReadLock();
     }
     try {
-      PBTreeFlushExecutor flushExecutor;
-      IDatabaseMNode<ICachedMNode> updatedStorageGroupMNode =
-          cacheManager.collectUpdatedStorageGroupMNodes();
-      if (updatedStorageGroupMNode != null) {
-        flushExecutor =
-            new PBTreeFlushExecutor(updatedStorageGroupMNode, cacheManager, 
file, lockManager);
-        flushExecutor.flushDatabase();
-      }
-
-      Iterator<ICachedMNode> volatileSubtrees = 
cacheManager.collectVolatileSubtrees();
-
+      PBTreeFlushExecutor flushExecutor = new 
PBTreeFlushExecutor(cacheManager, file, lockManager);
       long startTime = System.currentTimeMillis();
-      flushExecutor = new PBTreeFlushExecutor(volatileSubtrees, cacheManager, 
file, lockManager);
       flushExecutor.flushVolatileNodes();
       long time = System.currentTimeMillis() - startTime;
       if (time > 10_000) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/flush/PBTreeFlushExecutor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/flush/PBTreeFlushExecutor.java
index b1b59b630c7..b7526574fb5 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/flush/PBTreeFlushExecutor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/flush/PBTreeFlushExecutor.java
@@ -32,38 +32,41 @@ import org.slf4j.LoggerFactory;
 import java.io.IOException;
 import java.util.ArrayDeque;
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.Deque;
 import java.util.Iterator;
 import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
 
 public class PBTreeFlushExecutor {
 
   private static final Logger logger = 
LoggerFactory.getLogger(PBTreeFlushExecutor.class);
 
   private final Iterator<ICachedMNode> subtreeRoots;
+  private final IDatabaseMNode<ICachedMNode> databaseMNode;
+  private final AtomicInteger remainToFlush;
 
   private final ICacheManager cacheManager;
   private final ISchemaFile file;
   private final LockManager lockManager;
 
   public PBTreeFlushExecutor(
-      Iterator<ICachedMNode> subtreeRoots,
-      ICacheManager cacheManager,
-      ISchemaFile file,
-      LockManager lockManager) {
-    this.subtreeRoots = subtreeRoots;
+      ICacheManager cacheManager, ISchemaFile file, LockManager lockManager) {
+    this.remainToFlush = null;
+    this.subtreeRoots = cacheManager.collectVolatileSubtrees();
+    this.databaseMNode = cacheManager.collectUpdatedStorageGroupMNodes();
     this.cacheManager = cacheManager;
     this.file = file;
     this.lockManager = lockManager;
   }
 
   public PBTreeFlushExecutor(
-      IDatabaseMNode<ICachedMNode> databaseMNode,
+      AtomicInteger remainToFlush,
       ICacheManager cacheManager,
       ISchemaFile file,
       LockManager lockManager) {
-    this.subtreeRoots = 
Collections.singletonList(databaseMNode.getAsMNode()).iterator();
+    this.remainToFlush = remainToFlush;
+    this.subtreeRoots = cacheManager.collectVolatileSubtrees();
+    this.databaseMNode = cacheManager.collectUpdatedStorageGroupMNodes();
     this.cacheManager = cacheManager;
     this.file = file;
     this.lockManager = lockManager;
@@ -71,7 +74,14 @@ public class PBTreeFlushExecutor {
 
   public void flushVolatileNodes() throws MetadataException {
     List<Exception> exceptions = new ArrayList<>();
-    while (subtreeRoots.hasNext()) {
+    if (databaseMNode != null && checkRemainToFlush()) {
+      try {
+        processFlushDatabase(databaseMNode);
+      } catch (Exception e) {
+        exceptions.add(e);
+      }
+    }
+    while (subtreeRoots.hasNext() && checkRemainToFlush()) {
       try {
         processFlushNonDatabase(subtreeRoots.next());
       } catch (Exception e) {
@@ -84,11 +94,11 @@ public class PBTreeFlushExecutor {
     }
   }
 
-  public void flushDatabase() throws IOException {
-    while (subtreeRoots.hasNext()) {
-      ICachedMNode subtreeRoot = subtreeRoots.next();
-      processFlushDatabase(subtreeRoot.getAsDatabaseMNode());
+  private boolean checkRemainToFlush() {
+    if (remainToFlush == null) {
+      return true;
     }
+    return remainToFlush.decrementAndGet() >= 0;
   }
 
   private void processFlushDatabase(IDatabaseMNode<ICachedMNode> 
updatedStorageGroupMNode)
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/flush/Scheduler.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/flush/Scheduler.java
index 0ef7a95d3c3..bf387b1ea64 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/flush/Scheduler.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/flush/Scheduler.java
@@ -23,20 +23,16 @@ import 
org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
 import org.apache.iotdb.commons.concurrent.ThreadName;
 import 
org.apache.iotdb.commons.concurrent.threadpool.WrappedThreadPoolExecutor;
 import org.apache.iotdb.commons.exception.MetadataException;
-import org.apache.iotdb.commons.schema.node.role.IDatabaseMNode;
 import 
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.CachedMTreeStore;
 import 
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.cache.ICacheManager;
 import 
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.lock.LockManager;
 import 
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.memcontrol.IReleaseFlushStrategy;
-import 
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.mnode.ICachedMNode;
 import 
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.schemafile.ISchemaFile;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.IOException;
 import java.util.ArrayList;
-import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -72,7 +68,7 @@ public class Scheduler {
     this.workerPool =
         IoTDBThreadPoolFactory.newFixedThreadPool(
             FLUSH_WORKER_NUM,
-            ThreadName.PBTREE_FLUSH_PROCESSOR.getName(),
+            ThreadName.PBTREE_WORKER_POOL.getName(),
             new ThreadPoolExecutor.DiscardPolicy());
     this.flushingRegionSet = flushingRegionSet;
     this.releaseFlushStrategy = releaseFlushStrategy;
@@ -104,30 +100,14 @@ public class Scheduler {
                               ISchemaFile file = store.getSchemaFile();
                               LockManager lockManager = store.getLockManager();
                               long startTime = System.currentTimeMillis();
-                              PBTreeFlushExecutor flushExecutor;
-                              IDatabaseMNode<ICachedMNode> dbNode =
-                                  
cacheManager.collectUpdatedStorageGroupMNodes();
-                              if (dbNode != null) {
-                                flushExecutor =
-                                    new PBTreeFlushExecutor(
-                                        dbNode, cacheManager, file, 
lockManager);
-                                try {
-                                  flushExecutor.flushDatabase();
-                                } catch (IOException e) {
-                                  LOGGER.warn(
-                                      "Error occurred during MTree flush, 
current SchemaRegionId is {} because {}",
-                                      regionId,
-                                      e.getMessage(),
-                                      e);
-                                }
-                              }
-                              flushExecutor =
-                                  new PBTreeFlushExecutor(
-                                      cacheManager.collectVolatileSubtrees(),
-                                      cacheManager,
-                                      file,
-                                      lockManager);
                               try {
+                                lockManager.globalReadLock();
+                                if (file == null) {
+                                  // store has been closed
+                                  return;
+                                }
+                                PBTreeFlushExecutor flushExecutor =
+                                    new PBTreeFlushExecutor(cacheManager, 
file, lockManager);
                                 flushExecutor.flushVolatileNodes();
                               } catch (MetadataException e) {
                                 LOGGER.warn(
@@ -148,6 +128,7 @@ public class Scheduler {
                                       time,
                                       regionId);
                                 }
+                                lockManager.globalReadUnlock();
                                 flushingRegionSet.remove(regionId);
                               }
                             },
@@ -204,27 +185,17 @@ public class Scheduler {
             ICacheManager cacheManager = store.getCacheManager();
             ISchemaFile file = store.getSchemaFile();
             LockManager lockManager = store.getLockManager();
-            List<ICachedMNode> nodesToFlush = new ArrayList<>();
-            PBTreeFlushExecutor flushExecutor;
             long startTime = System.currentTimeMillis();
             try {
-              IDatabaseMNode<ICachedMNode> dbNode = 
cacheManager.collectUpdatedStorageGroupMNodes();
-              if (dbNode != null) {
-                flushExecutor = new PBTreeFlushExecutor(dbNode, cacheManager, 
file, lockManager);
-                flushExecutor.flushDatabase();
-                remainToFlush.decrementAndGet();
-              }
-              Iterator<ICachedMNode> volatileSubtrees = 
cacheManager.collectVolatileSubtrees();
-              while (volatileSubtrees.hasNext()) {
-                nodesToFlush.add(volatileSubtrees.next());
-                if (nodesToFlush.size() > remainToFlush.get()) {
-                  break;
-                }
+              lockManager.globalReadLock();
+              if (file == null) {
+                // store has been closed
+                return;
               }
-              flushExecutor =
-                  new PBTreeFlushExecutor(nodesToFlush.iterator(), 
cacheManager, file, lockManager);
+              PBTreeFlushExecutor flushExecutor =
+                  new PBTreeFlushExecutor(remainToFlush, cacheManager, file, 
lockManager);
               flushExecutor.flushVolatileNodes();
-            } catch (MetadataException | IOException e) {
+            } catch (MetadataException e) {
               LOGGER.warn(
                   "Error occurred during MTree flush, current SchemaRegionId 
is {} because {}",
                   regionId,
@@ -237,7 +208,7 @@ public class Scheduler {
               } else {
                 LOGGER.debug("It takes {}ms to flush MTree in SchemaRegion 
{}", time, regionId);
               }
-              remainToFlush.addAndGet(-nodesToFlush.size());
+              lockManager.globalReadUnlock();
               flushingRegionSet.remove(regionId);
             }
           });
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java
index dbc26565b85..423e3c57b0c 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java
@@ -64,11 +64,11 @@ public enum ThreadName {
   TIMED_FLUSH_UNSEQ_MEMTABLE("Timed-Flush-Unseq-Memtable"),
   // -------------------------- SchemaEngine --------------------------
   SCHEMA_REGION_RELEASE_PROCESSOR("SchemaRegion-Release-Task-Processor"),
-  SCHEMA_REGION_RECOVER_TASK("SchemaRegion-recover-task"),
+  SCHEMA_REGION_RECOVER_TASK("SchemaRegion-Recover-Task"),
   SCHEMA_FORCE_MLOG("SchemaEngine-TimedForceMLog-Thread"),
   PBTREE_RELEASE_MONITOR("PBTree-Release-Task-Monitor"),
   PBTREE_FLUSH_MONITOR("PBTree-Flush-Monitor"),
-  PBTREE_FLUSH_PROCESSOR("PBTree-Flush-Processor"),
+  PBTREE_WORKER_POOL("PBTree-Worker-Pool"),
   // -------------------------- ClientService --------------------------
   CLIENT_RPC_SERVICE("ClientRPC-Service"),
   CLIENT_RPC_PROCESSOR("ClientRPC-Processor"),
@@ -223,7 +223,7 @@ public enum ThreadName {
               PBTREE_RELEASE_MONITOR,
               SCHEMA_FORCE_MLOG,
               PBTREE_FLUSH_MONITOR,
-              PBTREE_FLUSH_PROCESSOR));
+              PBTREE_WORKER_POOL));
 
   private static final Set<ThreadName> clientServiceThreadNames =
       new HashSet<>(Arrays.asList(CLIENT_RPC_SERVICE, CLIENT_RPC_PROCESSOR));

Reply via email to