This is an automated email from the ASF dual-hosted git repository.

chenyz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 020b8a52885 Extract common code and fix PBTree UT (#11890)
020b8a52885 is described below

commit 020b8a52885e8bc98db728291f1cd46f437484b1
Author: Chen YZ <[email protected]>
AuthorDate: Fri Jan 12 18:11:55 2024 +0800

    Extract common code and fix PBTree UT (#11890)
---
 .../mtree/impl/pbtree/flush/Scheduler.java         | 150 ++++++++++-----------
 .../impl/pbtree/memory/ReleaseFlushMonitor.java    |   4 +-
 .../schemaRegion/SchemaStatisticsTest.java         |   2 +-
 3 files changed, 73 insertions(+), 83 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/flush/Scheduler.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/flush/Scheduler.java
index 579db96c5e8..5a22b23492b 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/flush/Scheduler.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/flush/Scheduler.java
@@ -75,12 +75,60 @@ public class Scheduler {
     this.releaseFlushStrategy = releaseFlushStrategy;
   }
 
+  private void executeFlush(CachedMTreeStore store, int regionId, 
AtomicInteger remainToFlush) {
+    IMemoryManager memoryManager = store.getMemoryManager();
+    ISchemaFile file = store.getSchemaFile();
+    LockManager lockManager = store.getLockManager();
+    long startTime = System.currentTimeMillis();
+    AtomicLong flushNodeNum = new AtomicLong(0);
+    AtomicLong flushMemSize = new AtomicLong(0);
+    try {
+      PBTreeFlushExecutor flushExecutor;
+      if (remainToFlush == null) {
+        flushExecutor = new PBTreeFlushExecutor(memoryManager, file, 
lockManager);
+      } else {
+        flushExecutor = new PBTreeFlushExecutor(remainToFlush, memoryManager, 
file, lockManager);
+      }
+      flushExecutor.flushVolatileNodes(flushNodeNum, flushMemSize);
+    } catch (MetadataException e) {
+      LOGGER.warn(
+          "Error occurred during MTree flush, current SchemaRegionId is {} 
because {}",
+          regionId,
+          e.getMessage(),
+          e);
+    } finally {
+      long time = System.currentTimeMillis() - startTime;
+      if (time > 10_000) {
+        LOGGER.info("It takes {}ms to flush MTree in SchemaRegion {}", time, 
regionId);
+      } else {
+        LOGGER.debug("It takes {}ms to flush MTree in SchemaRegion {}", time, 
regionId);
+      }
+      store.recordFlushMetrics(time, flushNodeNum.get(), flushMemSize.get());
+      flushingRegionSet.remove(regionId);
+    }
+  }
+
+  private void executeRelease(CachedMTreeStore store, boolean force) {
+    AtomicLong releaseNodeNum = new AtomicLong(0);
+    AtomicLong releaseMemorySize = new AtomicLong(0);
+    long startTime = System.currentTimeMillis();
+    while (force || releaseFlushStrategy.isExceedReleaseThreshold()) {
+      // store try to release memory if not exceed release threshold
+      if (store.executeMemoryRelease(releaseNodeNum, releaseMemorySize)) {
+        // if store can not release memory, break
+        break;
+      }
+    }
+    store.recordReleaseMetrics(
+        System.currentTimeMillis() - startTime, releaseNodeNum.get(), 
releaseMemorySize.get());
+  }
+
   /**
    * Force flush all volatile subtrees and updated database MNodes to disk. 
After flushing, the
    * MNodes will be placed into node cache. This method will return 
synchronously after all stores
    * are successfully flushed.
    */
-  public synchronized CompletableFuture<Void> forceFlushAll() {
+  public synchronized CompletableFuture<Void> scheduleFlushAll() {
     List<Map.Entry<Integer, CachedMTreeStore>> flushEngineList = new 
ArrayList<>();
     for (Map.Entry<Integer, CachedMTreeStore> entry : 
regionToStore.entrySet()) {
       if (flushingRegionSet.contains(entry.getKey())) {
@@ -101,43 +149,17 @@ public class Scheduler {
                             // store has been closed
                             return;
                           }
-                          IMemoryManager memoryManager = 
store.getMemoryManager();
-                          ISchemaFile file = store.getSchemaFile();
                           LockManager lockManager = store.getLockManager();
-                          long startTime = System.currentTimeMillis();
-                          AtomicLong flushNodeNum = new AtomicLong(0);
-                          AtomicLong flushMemSize = new AtomicLong(0);
+                          lockManager.globalReadLock();
+                          if (!regionToStore.containsKey(regionId)) {
+                            // double check store have not been closed
+                            return;
+                          }
                           try {
-                            lockManager.globalReadLock();
-                            if (!regionToStore.containsKey(regionId)) {
-                              // double check store have not been closed
-                              return;
-                            }
-                            PBTreeFlushExecutor flushExecutor =
-                                new PBTreeFlushExecutor(memoryManager, file, 
lockManager);
-                            flushExecutor.flushVolatileNodes(flushNodeNum, 
flushMemSize);
-                          } catch (MetadataException e) {
-                            LOGGER.warn(
-                                "Error occurred during MTree flush, current 
SchemaRegionId is {} because {}",
-                                regionId,
-                                e.getMessage(),
-                                e);
+                            executeFlush(store, regionId, null);
+                            executeRelease(store, false);
                           } finally {
-                            long time = System.currentTimeMillis() - startTime;
-                            if (time > 10_000) {
-                              LOGGER.info(
-                                  "It takes {}ms to flush MTree in 
SchemaRegion {}",
-                                  time,
-                                  regionId);
-                            } else {
-                              LOGGER.debug(
-                                  "It takes {}ms to flush MTree in 
SchemaRegion {}",
-                                  time,
-                                  regionId);
-                            }
-                            store.recordFlushMetrics(time, flushNodeNum.get(), 
flushMemSize.get());
                             lockManager.globalReadUnlock();
-                            flushingRegionSet.remove(regionId);
                           }
                         },
                         workerPool))
@@ -159,28 +181,18 @@ public class Scheduler {
                             () -> {
                               int regionId = entry.getKey();
                               CachedMTreeStore store = entry.getValue();
+                              if (store == null) {
+                                // store has been closed
+                                return;
+                              }
+                              LockManager lockManager = store.getLockManager();
+                              lockManager.globalReadLock(true);
                               if (!regionToStore.containsKey(regionId)) {
                                 // double check store have not been closed
                                 return;
                               }
-                              LockManager lockManager = store.getLockManager();
                               try {
-                                lockManager.globalReadLock(true);
-                                AtomicLong releaseNodeNum = new AtomicLong(0);
-                                AtomicLong releaseMemorySize = new 
AtomicLong(0);
-                                long startTime = System.currentTimeMillis();
-                                while (force || 
releaseFlushStrategy.isExceedReleaseThreshold()) {
-                                  // store try to release memory if not exceed 
release threshold
-                                  if (store.executeMemoryRelease(
-                                      releaseNodeNum, releaseMemorySize)) {
-                                    // if store can not release memory, break
-                                    break;
-                                  }
-                                }
-                                store.recordReleaseMetrics(
-                                    System.currentTimeMillis() - startTime,
-                                    releaseNodeNum.get(),
-                                    releaseMemorySize.get());
+                                executeRelease(store, force);
                               } finally {
                                 lockManager.globalReadUnlock();
                               }
@@ -213,39 +225,17 @@ public class Scheduler {
               // store has been closed
               return;
             }
-            IMemoryManager memoryManager = store.getMemoryManager();
-            ISchemaFile file = store.getSchemaFile();
             LockManager lockManager = store.getLockManager();
-            long startTime = System.currentTimeMillis();
-            AtomicLong flushNodeNum = new AtomicLong(0);
-            AtomicLong flushMemSize = new AtomicLong(0);
+            lockManager.globalReadLock();
+            if (!regionToStore.containsKey(regionId)) {
+              // double check store have not been closed
+              return;
+            }
             try {
-              lockManager.globalReadLock();
-              if (!regionToStore.containsKey(regionId)) {
-                // double check store have not been closed
-                return;
-              }
-              PBTreeFlushExecutor flushExecutor =
-                  new PBTreeFlushExecutor(remainToFlush, memoryManager, file, 
lockManager);
-              flushExecutor.flushVolatileNodes(flushNodeNum, flushMemSize);
-            } catch (MetadataException e) {
-              LOGGER.warn(
-                  "Error occurred during MTree flush, current SchemaRegionId 
is {} because {}",
-                  regionId,
-                  e.getMessage(),
-                  e);
-            } catch (Throwable e) {
-              LOGGER.error("Error occurred during PBTree flush", e);
+
+              executeFlush(store, regionId, remainToFlush);
             } finally {
-              long time = System.currentTimeMillis() - startTime;
-              if (time > 10_000) {
-                LOGGER.info("It takes {}ms to flush MTree in SchemaRegion {}", 
time, regionId);
-              } else {
-                LOGGER.debug("It takes {}ms to flush MTree in SchemaRegion 
{}", time, regionId);
-              }
-              store.recordFlushMetrics(time, flushNodeNum.get(), 
flushMemSize.get());
               lockManager.globalReadUnlock();
-              flushingRegionSet.remove(regionId);
             }
           });
       if (remainToFlush.get() <= 0) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/memory/ReleaseFlushMonitor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/memory/ReleaseFlushMonitor.java
index cf367416968..81dbd816ddb 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/memory/ReleaseFlushMonitor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/memory/ReleaseFlushMonitor.java
@@ -119,7 +119,7 @@ public class ReleaseFlushMonitor {
                 // 2. if it still exceeds release threshold, it will try to 
flush node buffer, then
                 // release node cache again
                 if (releaseFlushStrategy.isExceedReleaseThreshold()) {
-                  scheduler.forceFlushAll();
+                  scheduler.scheduleFlushAll();
                   regionToTraverserTime.values().forEach(RecordList::clear);
                 }
                 synchronized (blockObject) {
@@ -238,7 +238,7 @@ public class ReleaseFlushMonitor {
         }
       }
       if (needFlush) {
-        scheduler.forceFlushAll().join();
+        scheduler.scheduleFlushAll().join();
         scheduler.scheduleRelease(true);
       } else {
         break;
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/metadata/schemaRegion/SchemaStatisticsTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/metadata/schemaRegion/SchemaStatisticsTest.java
index da8d88b34c3..5b7c4dd6589 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/metadata/schemaRegion/SchemaStatisticsTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/metadata/schemaRegion/SchemaStatisticsTest.java
@@ -124,7 +124,7 @@ public class SchemaStatisticsTest extends 
AbstractSchemaRegionTest {
 
       IMNodeFactory<?> nodeFactory = 
MNodeFactoryLoader.getInstance().getCachedMNodeIMNodeFactory();
       // wait release and flush task
-      Thread.sleep(6500);
+      Thread.sleep(1000);
       // schemaRegion1
       IMNode<?> sg1 =
           nodeFactory.createDatabaseMNode(

Reply via email to