This is an automated email from the ASF dual-hosted git repository.

chenyz pushed a commit to branch pbtree_page_concurrency
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/pbtree_page_concurrency by 
this push:
     new 035c224d6fd Implement PageLifecycleManager for global page control 
(#12008)
035c224d6fd is described below

commit 035c224d6fd801e91958cd16b89741995a1fd35f
Author: Chen YZ <[email protected]>
AuthorDate: Thu Feb 22 14:37:24 2024 +0800

    Implement PageLifecycleManager for global page control (#12008)
    
    * done
    
    * add license
---
 .../schemaengine/rescon/SchemaResourceManager.java |   9 ++
 .../mtree/impl/pbtree/CachedMTreeStore.java        |   2 +
 .../mtree/impl/pbtree/PBTreeFactory.java           |   3 +
 .../impl/pbtree/memory/page/FIFOPageContainer.java |  79 +++++++++
 .../impl/pbtree/memory/page/IPageContainer.java    |  39 +++++
 .../pbtree/memory/page/PageLifecycleManager.java   | 125 +++++++++++++++
 .../mtree/impl/pbtree/schemafile/ISchemaFile.java  |   3 +
 .../impl/pbtree/schemafile/MockSchemaFile.java     |   6 +
 .../mtree/impl/pbtree/schemafile/SchemaFile.java   |   4 +
 .../pbtree/schemafile/pagemgr/IPageManager.java    |   2 +
 .../pbtree/schemafile/pagemgr/PageManager.java     |   5 +
 .../impl/pbtree/schemafile/pagemgr/PagePool.java   |  22 +++
 .../db/metadata/page/PageLifecycleManagerTest.java | 176 +++++++++++++++++++++
 13 files changed, 475 insertions(+)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/rescon/SchemaResourceManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/rescon/SchemaResourceManager.java
index 0be0d252c69..309d27a9db8 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/rescon/SchemaResourceManager.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/rescon/SchemaResourceManager.java
@@ -20,8 +20,11 @@
 package org.apache.iotdb.db.schemaengine.rescon;
 
 import org.apache.iotdb.commons.conf.CommonDescriptor;
+import org.apache.iotdb.db.conf.IoTDBConfig;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.schemaengine.SchemaEngineMode;
 import 
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.memory.ReleaseFlushMonitor;
+import 
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.memory.page.PageLifecycleManager;
 
 public class SchemaResourceManager {
 
@@ -33,6 +36,12 @@ public class SchemaResourceManager {
         .getSchemaEngineMode()
         .equals(SchemaEngineMode.PBTree.toString())) {
       initSchemaFileModeResource(engineStatistics);
+      IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
+      PageLifecycleManager.getInstance()
+          .loadConfiguration(
+              config.getPbtreeCachePageNum() + config.getPbtreeBufferPageNum(),
+              config.getPbtreeCachePageNum(),
+              config.getPbtreeBufferPageNum());
     }
   }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/CachedMTreeStore.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/CachedMTreeStore.java
index 4b894a19a23..1fa955e1485 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/CachedMTreeStore.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/CachedMTreeStore.java
@@ -36,6 +36,7 @@ import 
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.lock.Lock
 import 
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.memcontrol.MemoryStatistics;
 import 
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.memory.IMemoryManager;
 import 
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.memory.ReleaseFlushMonitor;
+import 
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.memory.page.PageLifecycleManager;
 import 
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.mnode.ICachedMNode;
 import 
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.mnode.container.ICachedMNodeContainer;
 import 
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.mnode.iterator.CachedTraverserIterator;
@@ -556,6 +557,7 @@ public class CachedMTreeStore implements 
IMTreeStore<ICachedMNode> {
         }
       }
       file = null;
+      PageLifecycleManager.getInstance().removePagePool(schemaRegionId);
     } finally {
       lockManager.globalWriteUnlock();
     }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/PBTreeFactory.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/PBTreeFactory.java
index 8761cc35b32..9e32e84c3fb 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/PBTreeFactory.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/PBTreeFactory.java
@@ -28,6 +28,7 @@ import 
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.memcontro
 import 
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.memory.IMemoryManager;
 import 
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.memory.MemoryManager;
 import 
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.memory.ReleaseFlushMonitor;
+import 
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.memory.page.PageLifecycleManager;
 import 
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.schemafile.ISchemaFile;
 import 
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.schemafile.SchemaFile;
 
@@ -91,6 +92,8 @@ public class PBTreeFactory {
     regionStatistics.setMemoryManager(memoryManager);
 
     ReleaseFlushMonitor releaseFlushMonitor = 
ReleaseFlushMonitor.getInstance();
+    PageLifecycleManager.getInstance()
+        .registerPagePool(schemaRegionId, 
schemaFile.getPageManager().getPagePool());
     CachedMTreeStore cachedMTreeStore =
         new CachedMTreeStore(
             schemaRegionId,
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/memory/page/FIFOPageContainer.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/memory/page/FIFOPageContainer.java
new file mode 100644
index 00000000000..ea32c25bb88
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/memory/page/FIFOPageContainer.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package 
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.memory.page;
+
+import org.apache.iotdb.commons.utils.TestOnly;
+import 
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.schemafile.ISchemaPage;
+import org.apache.iotdb.tsfile.utils.Pair;
+
+import java.util.Iterator;
+import java.util.concurrent.ConcurrentLinkedDeque;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Predicate;
+
+public class FIFOPageContainer implements IPageContainer {
+
+  private final ConcurrentLinkedDeque<Pair<Integer, ISchemaPage>> pageQueue =
+      new ConcurrentLinkedDeque<>();
+  private final AtomicInteger totalSize =
+      new AtomicInteger(0); // total size of pages, because calculate size is 
too expensive
+
+  @Override
+  public void put(int regionId, ISchemaPage page) {
+    pageQueue.add(new Pair<>(regionId, page));
+    totalSize.incrementAndGet();
+  }
+
+  @Override
+  public void iterateToRemove(Predicate<Pair<Integer, ISchemaPage>> predicate, 
int targetSize) {
+    Iterator<Pair<Integer, ISchemaPage>> iterator = pageQueue.iterator();
+    while (iterator.hasNext()) {
+      if (totalSize.get() <= targetSize) {
+        break; // consider concurrent situation
+      }
+      Pair<Integer, ISchemaPage> pair = iterator.next();
+      if (pair.right.getRefCnt().get() == 0 && predicate.test(pair)) {
+        iterator.remove();
+        totalSize.decrementAndGet();
+        if (totalSize.get() <= targetSize) {
+          break;
+        }
+      }
+    }
+  }
+
+  @Override
+  public int size() {
+    return totalSize.get();
+  }
+
+  @TestOnly
+  @Override
+  public Iterator<Pair<Integer, ISchemaPage>> iterator() {
+    return pageQueue.iterator();
+  }
+
+  @TestOnly
+  @Override
+  public void clear() {
+    pageQueue.clear();
+    totalSize.set(0);
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/memory/page/IPageContainer.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/memory/page/IPageContainer.java
new file mode 100644
index 00000000000..d7e39457915
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/memory/page/IPageContainer.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package 
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.memory.page;
+
+import 
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.schemafile.ISchemaPage;
+import org.apache.iotdb.tsfile.utils.Pair;
+
+import java.util.Iterator;
+import java.util.function.Predicate;
+
+public interface IPageContainer {
+
+  void put(int regionId, ISchemaPage page);
+
+  void iterateToRemove(Predicate<Pair<Integer, ISchemaPage>> predicate, int 
targetSize);
+
+  int size();
+
+  Iterator<Pair<Integer, ISchemaPage>> iterator();
+
+  void clear();
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/memory/page/PageLifecycleManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/memory/page/PageLifecycleManager.java
new file mode 100644
index 00000000000..3c45b5a31e0
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/memory/page/PageLifecycleManager.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package 
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.memory.page;
+
+import org.apache.iotdb.commons.utils.TestOnly;
+import 
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.schemafile.ISchemaPage;
+import 
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.schemafile.pagemgr.PagePool;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * This class is used to manage the lifecycle of pages in memory. There are 
two kinds of pages in
+ * memory: cached pages and volatile pages. There is a recommended capacity 
for both types of pages.
+ * For example, total capacity is 5, recommended capacity for cached pages is 
3, recommended
+ * capacity for volatile pages is 2. When system requests a cached page:
+ *
+ * <ol>
+ *   <li>If the current number of cached pages is less than 3, then we can 
allocate a new cached
+ *       page. A volatile node needs to be flushed if the total capacity is 
exceeded.
+ *   <li>If the current number of cached pages is greater than or equal to 3 
and the total capacity
+ *       is full, a cached page needs to be evicted for allocation.
+ *   <li>If the current number of cache pages is greater than or equal to 3 
and the total capacity
+ *       is not full, you can borrow the space of a volatile page and allocate 
a cache page
+ *       directly.
+ * </ol>
+ */
+public class PageLifecycleManager {
+  /* configuration */
+  private int capacity;
+  private int cachedCapacity;
+  private int volatileCapacity;
+
+  /* data */
+  // TODO(PBTree-Page-Concurrent): 这里也可以不是 PagePool,改为 IPageManager 等其他数据结构
+  private final Map<Integer, PagePool> regionToPagePoolMap = new 
ConcurrentHashMap<>();
+  private final IPageContainer cachedContainer = new FIFOPageContainer();
+  private final IPageContainer volatileContainer = new FIFOPageContainer();
+
+  public void loadConfiguration(int capacity, int cachedCapacity, int 
volatileCapacity) {
+    this.capacity = capacity;
+    this.cachedCapacity = cachedCapacity;
+    this.volatileCapacity = volatileCapacity;
+  }
+
+  public void registerPagePool(int regionId, PagePool pagePool) {
+    regionToPagePoolMap.put(regionId, pagePool);
+  }
+
+  public void removePagePool(int regionId) {
+    regionToPagePoolMap.remove(regionId);
+  }
+
+  // TODO(PBTree-Page-Concurrent): 有新cached页时,调一次这个方法
+  public void putCachedPage(int regionId, ISchemaPage schemaPage) {
+    cachedContainer.put(regionId, schemaPage);
+    checkCapacity();
+  }
+
+  // TODO(PBTree-Page-Concurrent): 有新的脏页时,调一次这个方法
+  public void putVolatilePage(int regionId, ISchemaPage schemaPage) {
+    volatileContainer.put(regionId, schemaPage);
+    checkCapacity();
+  }
+
+  // TODO(PBTree-Page-Concurrent): 读写结束以后,调一次这个方法
+  public void checkCapacity() {
+    if (cachedContainer.size() + volatileContainer.size() > capacity) {
+      if (cachedContainer.size() > cachedCapacity) {
+        // try to evict cached pages
+        cachedContainer.iterateToRemove(
+            i -> regionToPagePoolMap.get(i.left).evict(i.right), 
cachedCapacity);
+      } else {
+        // try to flush volatile pages
+        volatileContainer.iterateToRemove(
+            i -> regionToPagePoolMap.get(i.left).flush(i.right), 
volatileCapacity);
+      }
+    }
+  }
+
+  @TestOnly
+  public IPageContainer getCachedContainer() {
+    return cachedContainer;
+  }
+
+  @TestOnly
+  public IPageContainer getVolatileContainer() {
+    return volatileContainer;
+  }
+
+  @TestOnly
+  public void clear() {
+    regionToPagePoolMap.clear();
+    cachedContainer.clear();
+    volatileContainer.clear();
+  }
+
+  // singleton
+  private PageLifecycleManager() {}
+
+  private static class PageLifecycleManagerHolder {
+    private static final PageLifecycleManager INSTANCE = new 
PageLifecycleManager();
+  }
+
+  public static PageLifecycleManager getInstance() {
+    return PageLifecycleManager.PageLifecycleManagerHolder.INSTANCE;
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/ISchemaFile.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/ISchemaFile.java
index 477c14148aa..e3bc74d1b22 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/ISchemaFile.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/ISchemaFile.java
@@ -21,6 +21,7 @@ package 
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.schemafi
 import org.apache.iotdb.commons.exception.MetadataException;
 import org.apache.iotdb.commons.schema.node.role.IDatabaseMNode;
 import 
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.mnode.ICachedMNode;
+import 
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.schemafile.pagemgr.IPageManager;
 
 import java.io.File;
 import java.io.IOException;
@@ -64,4 +65,6 @@ public interface ISchemaFile {
   Iterator<ICachedMNode> getChildren(ICachedMNode parent) throws 
MetadataException, IOException;
 
   boolean createSnapshot(File snapshotDir);
+
+  IPageManager getPageManager();
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/MockSchemaFile.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/MockSchemaFile.java
index 6dd9491df39..da5b187943e 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/MockSchemaFile.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/MockSchemaFile.java
@@ -26,6 +26,7 @@ import 
org.apache.iotdb.commons.schema.node.utils.IMNodeFactory;
 import 
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.mnode.ICachedMNode;
 import 
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.mnode.container.CachedMNodeContainer;
 import 
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.mnode.container.ICachedMNodeContainer;
+import 
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.schemafile.pagemgr.IPageManager;
 import 
org.apache.iotdb.db.schemaengine.schemaregion.mtree.loader.MNodeFactoryLoader;
 
 import java.io.File;
@@ -103,6 +104,11 @@ public class MockSchemaFile implements ISchemaFile {
     return false;
   }
 
+  @Override
+  public IPageManager getPageManager() {
+    return null;
+  }
+
   @Override
   public synchronized void writeMNode(ICachedMNode parent) {
     ICachedMNodeContainer container = getCachedMNodeContainer(parent);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/SchemaFile.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/SchemaFile.java
index 1b59a45cf53..91ef7d8ab77 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/SchemaFile.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/SchemaFile.java
@@ -453,6 +453,10 @@ public class SchemaFile implements ISchemaFile {
     pageManager.setMetric(metric);
   }
 
+  public IPageManager getPageManager() {
+    return pageManager;
+  }
+
   // endregion
 
   // region Snapshot
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/pagemgr/IPageManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/pagemgr/IPageManager.java
index bb9baf86fa7..09742639904 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/pagemgr/IPageManager.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/pagemgr/IPageManager.java
@@ -60,4 +60,6 @@ public interface IPageManager {
   void inspect(PrintWriter pw) throws IOException, MetadataException;
 
   void setMetric(SchemaRegionCachedMetric metric);
+
+  PagePool getPagePool();
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/pagemgr/PageManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/pagemgr/PageManager.java
index ad92a332d57..1b313fd7879 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/pagemgr/PageManager.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/pagemgr/PageManager.java
@@ -626,6 +626,11 @@ public abstract class PageManager implements IPageManager {
     cxt.markDirty(page);
   }
 
+  @Override
+  public PagePool getPagePool() {
+    return pagePool;
+  }
+
   // endregion
 
   // region Inner Utilities
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/pagemgr/PagePool.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/pagemgr/PagePool.java
index 7cd9d4d5860..af95eb61484 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/pagemgr/PagePool.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/pagemgr/PagePool.java
@@ -149,4 +149,26 @@ public class PagePool {
       }
     }
   }
+
+  /**
+   * Evict cached page from page pool.
+   *
+   * @param page page to be evicted
+   * @return true if evicted successfully, false otherwise
+   */
+  public boolean evict(ISchemaPage page) {
+    // TODO(PBTree-Page-Concurrent): 这里实现 evict cached page 的逻辑,这个接口也可以考虑移到 
IPageManager 中
+    return true;
+  }
+
+  /**
+   * Flush volatile page to disk.
+   *
+   * @param page page to be flushed
+   * @return true if flushed successfully, false otherwise
+   */
+  public boolean flush(ISchemaPage page) {
+    // TODO(PBTree-Page-Concurrent): 这里实现 flush volatile page 的逻辑,这个接口也可以考虑移到 
IPageManager 中
+    return true;
+  }
 }
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/metadata/page/PageLifecycleManagerTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/metadata/page/PageLifecycleManagerTest.java
new file mode 100644
index 00000000000..b629e392c90
--- /dev/null
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/metadata/page/PageLifecycleManagerTest.java
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.metadata.page;
+
+import org.apache.iotdb.db.conf.IoTDBConfig;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import 
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.memory.page.PageLifecycleManager;
+import 
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.schemafile.ISchemaPage;
+import 
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.schemafile.pagemgr.PagePool;
+import org.apache.iotdb.tsfile.utils.Pair;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class PageLifecycleManagerTest {
+
+  private final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
+  private final PageLifecycleManager manager = 
PageLifecycleManager.getInstance();
+
+  @Before
+  public void setUp() throws Exception {
+    manager.loadConfiguration(5, 2, 3);
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    manager.clear();
+    manager.loadConfiguration(
+        config.getPbtreeCachePageNum() + config.getPbtreeBufferPageNum(),
+        config.getPbtreeCachePageNum(),
+        config.getPbtreeBufferPageNum());
+  }
+
+  // Test proportion control and fifo strategy
+  @Test
+  public void testFIFOCacheBufferProportion() {
+    PagePool pool1 = mock(PagePool.class);
+    manager.registerPagePool(1, pool1);
+    List<ISchemaPage> pageList = new ArrayList<>();
+    for (int i = 0; i < 10; i++) {
+      ISchemaPage page = mock(ISchemaPage.class);
+      when(page.getRefCnt()).thenReturn(new AtomicInteger(0));
+      when(page.getPageIndex()).thenReturn(i);
+      pageList.add(page);
+      when(pool1.evict(page)).thenReturn(true);
+      when(pool1.flush(page)).thenReturn(true);
+    }
+    for (int i = 0; i < 3; i++) {
+      ISchemaPage page = pageList.get(i);
+      manager.putCachedPage(1, page);
+    }
+    Assert.assertEquals(3, manager.getCachedContainer().size());
+    Assert.assertEquals(0, manager.getVolatileContainer().size());
+    for (int i = 3; i < 5; i++) {
+      ISchemaPage page = pageList.get(i);
+      manager.putVolatilePage(1, page);
+    }
+    Assert.assertEquals(3, manager.getCachedContainer().size());
+    Assert.assertEquals(2, manager.getVolatileContainer().size());
+    for (int i = 5; i < 7; i++) {
+      ISchemaPage page = pageList.get(i);
+      manager.putCachedPage(1, page);
+    }
+    Assert.assertEquals(3, manager.getCachedContainer().size());
+    Assert.assertEquals(2, manager.getVolatileContainer().size());
+    Iterator<Pair<Integer, ISchemaPage>> iterator = 
manager.getCachedContainer().iterator();
+    Assert.assertEquals(2, iterator.next().right.getPageIndex());
+    Assert.assertEquals(5, iterator.next().right.getPageIndex());
+    Assert.assertEquals(6, iterator.next().right.getPageIndex());
+    Assert.assertFalse(iterator.hasNext());
+
+    manager.putVolatilePage(1, pageList.get(7));
+    Assert.assertEquals(2, manager.getCachedContainer().size());
+    Assert.assertEquals(3, manager.getVolatileContainer().size());
+    manager.putVolatilePage(1, pageList.get(8));
+    Assert.assertEquals(2, manager.getCachedContainer().size());
+    Assert.assertEquals(3, manager.getVolatileContainer().size());
+    manager.putCachedPage(1, pageList.get(9));
+    Assert.assertEquals(2, manager.getCachedContainer().size());
+    Assert.assertEquals(3, manager.getVolatileContainer().size());
+    iterator = manager.getCachedContainer().iterator();
+    Assert.assertEquals(6, iterator.next().right.getPageIndex());
+    Assert.assertEquals(9, iterator.next().right.getPageIndex());
+    Assert.assertFalse(iterator.hasNext());
+    iterator = manager.getVolatileContainer().iterator();
+    Assert.assertEquals(4, iterator.next().right.getPageIndex());
+    Assert.assertEquals(7, iterator.next().right.getPageIndex());
+    Assert.assertEquals(8, iterator.next().right.getPageIndex());
+    Assert.assertFalse(iterator.hasNext());
+  }
+
+  @Test
+  public void testLockPage() {
+    PagePool pool1 = mock(PagePool.class);
+    PagePool pool2 = mock(PagePool.class);
+    manager.registerPagePool(1, pool1);
+    manager.registerPagePool(2, pool2);
+
+    // [1, 1] is locked volatile page
+    ISchemaPage page = mock(ISchemaPage.class);
+    when(page.getRefCnt()).thenReturn(new AtomicInteger(1));
+    when(page.getPageIndex()).thenReturn(1);
+    when(pool1.flush(page)).thenReturn(false);
+    manager.putVolatilePage(1, page);
+
+    // other page is not locked
+    for (int i = 0; i < 2; i++) {
+      page = mock(ISchemaPage.class);
+      when(page.getRefCnt()).thenReturn(new AtomicInteger(0));
+      when(page.getPageIndex()).thenReturn(2 + i);
+      when(pool2.flush(page)).thenReturn(true);
+      manager.putVolatilePage(2, page);
+    }
+    for (int i = 0; i < 2; i++) {
+      page = mock(ISchemaPage.class);
+      when(page.getRefCnt()).thenReturn(new AtomicInteger(0));
+      when(page.getPageIndex()).thenReturn(4 + i);
+      when(pool1.evict(page)).thenReturn(true);
+      manager.putCachedPage(1, page);
+    }
+
+    // check current status
+    Iterator<Pair<Integer, ISchemaPage>> iterator = 
manager.getVolatileContainer().iterator();
+    int idx = 1;
+    while (iterator.hasNext()) {
+      Pair<Integer, ISchemaPage> pair = iterator.next();
+      Assert.assertEquals((idx == 2 || idx == 3) ? 2 : 1, 
pair.left.intValue());
+      Assert.assertEquals(idx++, pair.right.getPageIndex());
+    }
+    iterator = manager.getCachedContainer().iterator();
+    while (iterator.hasNext()) {
+      Pair<Integer, ISchemaPage> pair = iterator.next();
+      Assert.assertEquals(1, pair.left.intValue());
+      Assert.assertEquals(idx++, pair.right.getPageIndex());
+    }
+
+    // add a new volatile page, the locked page should be evicted
+    page = mock(ISchemaPage.class);
+    when(page.getRefCnt()).thenReturn(new AtomicInteger(0));
+    when(page.getPageIndex()).thenReturn(6);
+    when(pool1.flush(page)).thenReturn(true);
+    manager.putVolatilePage(1, page);
+    iterator = manager.getVolatileContainer().iterator();
+    Assert.assertEquals(1, iterator.next().right.getPageIndex());
+    Assert.assertEquals(3, iterator.next().right.getPageIndex());
+    Assert.assertEquals(6, iterator.next().right.getPageIndex());
+    Assert.assertFalse(iterator.hasNext());
+  }
+}

Reply via email to