This is an automated email from the ASF dual-hosted git repository.
chenyz pushed a commit to branch pbtree_page_concurrency
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/pbtree_page_concurrency by
this push:
new 035c224d6fd Implement PageLifecycleManager for global page control
(#12008)
035c224d6fd is described below
commit 035c224d6fd801e91958cd16b89741995a1fd35f
Author: Chen YZ <[email protected]>
AuthorDate: Thu Feb 22 14:37:24 2024 +0800
Implement PageLifecycleManager for global page control (#12008)
* done
* add license
---
.../schemaengine/rescon/SchemaResourceManager.java | 9 ++
.../mtree/impl/pbtree/CachedMTreeStore.java | 2 +
.../mtree/impl/pbtree/PBTreeFactory.java | 3 +
.../impl/pbtree/memory/page/FIFOPageContainer.java | 79 +++++++++
.../impl/pbtree/memory/page/IPageContainer.java | 39 +++++
.../pbtree/memory/page/PageLifecycleManager.java | 125 +++++++++++++++
.../mtree/impl/pbtree/schemafile/ISchemaFile.java | 3 +
.../impl/pbtree/schemafile/MockSchemaFile.java | 6 +
.../mtree/impl/pbtree/schemafile/SchemaFile.java | 4 +
.../pbtree/schemafile/pagemgr/IPageManager.java | 2 +
.../pbtree/schemafile/pagemgr/PageManager.java | 5 +
.../impl/pbtree/schemafile/pagemgr/PagePool.java | 22 +++
.../db/metadata/page/PageLifecycleManagerTest.java | 176 +++++++++++++++++++++
13 files changed, 475 insertions(+)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/rescon/SchemaResourceManager.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/rescon/SchemaResourceManager.java
index 0be0d252c69..309d27a9db8 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/rescon/SchemaResourceManager.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/rescon/SchemaResourceManager.java
@@ -20,8 +20,11 @@
package org.apache.iotdb.db.schemaengine.rescon;
import org.apache.iotdb.commons.conf.CommonDescriptor;
+import org.apache.iotdb.db.conf.IoTDBConfig;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.schemaengine.SchemaEngineMode;
import
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.memory.ReleaseFlushMonitor;
+import
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.memory.page.PageLifecycleManager;
public class SchemaResourceManager {
@@ -33,6 +36,12 @@ public class SchemaResourceManager {
.getSchemaEngineMode()
.equals(SchemaEngineMode.PBTree.toString())) {
initSchemaFileModeResource(engineStatistics);
+ IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
+ PageLifecycleManager.getInstance()
+ .loadConfiguration(
+ config.getPbtreeCachePageNum() + config.getPbtreeBufferPageNum(),
+ config.getPbtreeCachePageNum(),
+ config.getPbtreeBufferPageNum());
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/CachedMTreeStore.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/CachedMTreeStore.java
index 4b894a19a23..1fa955e1485 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/CachedMTreeStore.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/CachedMTreeStore.java
@@ -36,6 +36,7 @@ import
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.lock.Lock
import
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.memcontrol.MemoryStatistics;
import
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.memory.IMemoryManager;
import
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.memory.ReleaseFlushMonitor;
+import
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.memory.page.PageLifecycleManager;
import
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.mnode.ICachedMNode;
import
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.mnode.container.ICachedMNodeContainer;
import
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.mnode.iterator.CachedTraverserIterator;
@@ -556,6 +557,7 @@ public class CachedMTreeStore implements
IMTreeStore<ICachedMNode> {
}
}
file = null;
+ PageLifecycleManager.getInstance().removePagePool(schemaRegionId);
} finally {
lockManager.globalWriteUnlock();
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/PBTreeFactory.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/PBTreeFactory.java
index 8761cc35b32..9e32e84c3fb 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/PBTreeFactory.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/PBTreeFactory.java
@@ -28,6 +28,7 @@ import
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.memcontro
import
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.memory.IMemoryManager;
import
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.memory.MemoryManager;
import
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.memory.ReleaseFlushMonitor;
+import
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.memory.page.PageLifecycleManager;
import
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.schemafile.ISchemaFile;
import
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.schemafile.SchemaFile;
@@ -91,6 +92,8 @@ public class PBTreeFactory {
regionStatistics.setMemoryManager(memoryManager);
ReleaseFlushMonitor releaseFlushMonitor =
ReleaseFlushMonitor.getInstance();
+ PageLifecycleManager.getInstance()
+ .registerPagePool(schemaRegionId,
schemaFile.getPageManager().getPagePool());
CachedMTreeStore cachedMTreeStore =
new CachedMTreeStore(
schemaRegionId,
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/memory/page/FIFOPageContainer.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/memory/page/FIFOPageContainer.java
new file mode 100644
index 00000000000..ea32c25bb88
--- /dev/null
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/memory/page/FIFOPageContainer.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.memory.page;
+
+import org.apache.iotdb.commons.utils.TestOnly;
+import
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.schemafile.ISchemaPage;
+import org.apache.iotdb.tsfile.utils.Pair;
+
+import java.util.Iterator;
+import java.util.concurrent.ConcurrentLinkedDeque;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Predicate;
+
+public class FIFOPageContainer implements IPageContainer {
+
+ private final ConcurrentLinkedDeque<Pair<Integer, ISchemaPage>> pageQueue =
+ new ConcurrentLinkedDeque<>();
+ private final AtomicInteger totalSize =
+ new AtomicInteger(0); // total size of pages, because calculate size is
too expensive
+
+ @Override
+ public void put(int regionId, ISchemaPage page) {
+ pageQueue.add(new Pair<>(regionId, page));
+ totalSize.incrementAndGet();
+ }
+
+ @Override
+ public void iterateToRemove(Predicate<Pair<Integer, ISchemaPage>> predicate,
int targetSize) {
+ Iterator<Pair<Integer, ISchemaPage>> iterator = pageQueue.iterator();
+ while (iterator.hasNext()) {
+ if (totalSize.get() <= targetSize) {
+ break; // consider concurrent situation
+ }
+ Pair<Integer, ISchemaPage> pair = iterator.next();
+ if (pair.right.getRefCnt().get() == 0 && predicate.test(pair)) {
+ iterator.remove();
+ totalSize.decrementAndGet();
+ if (totalSize.get() <= targetSize) {
+ break;
+ }
+ }
+ }
+ }
+
+ @Override
+ public int size() {
+ return totalSize.get();
+ }
+
+ @TestOnly
+ @Override
+ public Iterator<Pair<Integer, ISchemaPage>> iterator() {
+ return pageQueue.iterator();
+ }
+
+ @TestOnly
+ @Override
+ public void clear() {
+ pageQueue.clear();
+ totalSize.set(0);
+ }
+}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/memory/page/IPageContainer.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/memory/page/IPageContainer.java
new file mode 100644
index 00000000000..d7e39457915
--- /dev/null
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/memory/page/IPageContainer.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.memory.page;
+
+import
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.schemafile.ISchemaPage;
+import org.apache.iotdb.tsfile.utils.Pair;
+
+import java.util.Iterator;
+import java.util.function.Predicate;
+
+public interface IPageContainer {
+
+ void put(int regionId, ISchemaPage page);
+
+ void iterateToRemove(Predicate<Pair<Integer, ISchemaPage>> predicate, int
targetSize);
+
+ int size();
+
+ Iterator<Pair<Integer, ISchemaPage>> iterator();
+
+ void clear();
+}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/memory/page/PageLifecycleManager.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/memory/page/PageLifecycleManager.java
new file mode 100644
index 00000000000..3c45b5a31e0
--- /dev/null
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/memory/page/PageLifecycleManager.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.memory.page;
+
+import org.apache.iotdb.commons.utils.TestOnly;
+import
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.schemafile.ISchemaPage;
+import
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.schemafile.pagemgr.PagePool;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * This class is used to manage the lifecycle of pages in memory. There are
two kinds of pages in
+ * memory: cached pages and volatile pages. There is a recommended capacity
for both types of pages.
+ * For example, total capacity is 5, recommended capacity for cached pages is
3, recommended
+ * capacity for volatile pages is 2. When system requests a cached page:
+ *
+ * <ol>
+ * <li>If the current number of cached pages is less than 3, then we can
allocate a new cached
+ * page. A volatile node needs to be flushed if the total capacity is
exceeded.
+ * <li>If the current number of cached pages is greater than or equal to 3
and the total capacity
+ * is full, a cached page needs to be evicted for allocation.
+ * <li>If the current number of cache pages is greater than or equal to 3
and the total capacity
+ * is not full, you can borrow the space of a volatile page and allocate
a cache page
+ * directly.
+ * </ol>
+ */
+public class PageLifecycleManager {
+ /* configuration */
+ private int capacity;
+ private int cachedCapacity;
+ private int volatileCapacity;
+
+ /* data */
+ // TODO(PBTree-Page-Concurrent): 这里也可以不是 PagePool,改为 IPageManager 等其他数据结构
+ private final Map<Integer, PagePool> regionToPagePoolMap = new
ConcurrentHashMap<>();
+ private final IPageContainer cachedContainer = new FIFOPageContainer();
+ private final IPageContainer volatileContainer = new FIFOPageContainer();
+
+ public void loadConfiguration(int capacity, int cachedCapacity, int
volatileCapacity) {
+ this.capacity = capacity;
+ this.cachedCapacity = cachedCapacity;
+ this.volatileCapacity = volatileCapacity;
+ }
+
+ public void registerPagePool(int regionId, PagePool pagePool) {
+ regionToPagePoolMap.put(regionId, pagePool);
+ }
+
+ public void removePagePool(int regionId) {
+ regionToPagePoolMap.remove(regionId);
+ }
+
+ // TODO(PBTree-Page-Concurrent): 有新cached页时,调一次这个方法
+ public void putCachedPage(int regionId, ISchemaPage schemaPage) {
+ cachedContainer.put(regionId, schemaPage);
+ checkCapacity();
+ }
+
+ // TODO(PBTree-Page-Concurrent): 有新的脏页时,调一次这个方法
+ public void putVolatilePage(int regionId, ISchemaPage schemaPage) {
+ volatileContainer.put(regionId, schemaPage);
+ checkCapacity();
+ }
+
+ // TODO(PBTree-Page-Concurrent): 读写结束以后,调一次这个方法
+ public void checkCapacity() {
+ if (cachedContainer.size() + volatileContainer.size() > capacity) {
+ if (cachedContainer.size() > cachedCapacity) {
+ // try to evict cached pages
+ cachedContainer.iterateToRemove(
+ i -> regionToPagePoolMap.get(i.left).evict(i.right),
cachedCapacity);
+ } else {
+ // try to flush volatile pages
+ volatileContainer.iterateToRemove(
+ i -> regionToPagePoolMap.get(i.left).flush(i.right),
volatileCapacity);
+ }
+ }
+ }
+
+ @TestOnly
+ public IPageContainer getCachedContainer() {
+ return cachedContainer;
+ }
+
+ @TestOnly
+ public IPageContainer getVolatileContainer() {
+ return volatileContainer;
+ }
+
+ @TestOnly
+ public void clear() {
+ regionToPagePoolMap.clear();
+ cachedContainer.clear();
+ volatileContainer.clear();
+ }
+
+ // singleton
+ private PageLifecycleManager() {}
+
+ private static class PageLifecycleManagerHolder {
+ private static final PageLifecycleManager INSTANCE = new
PageLifecycleManager();
+ }
+
+ public static PageLifecycleManager getInstance() {
+ return PageLifecycleManager.PageLifecycleManagerHolder.INSTANCE;
+ }
+}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/ISchemaFile.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/ISchemaFile.java
index 477c14148aa..e3bc74d1b22 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/ISchemaFile.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/ISchemaFile.java
@@ -21,6 +21,7 @@ package
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.schemafi
import org.apache.iotdb.commons.exception.MetadataException;
import org.apache.iotdb.commons.schema.node.role.IDatabaseMNode;
import
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.mnode.ICachedMNode;
+import
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.schemafile.pagemgr.IPageManager;
import java.io.File;
import java.io.IOException;
@@ -64,4 +65,6 @@ public interface ISchemaFile {
Iterator<ICachedMNode> getChildren(ICachedMNode parent) throws
MetadataException, IOException;
boolean createSnapshot(File snapshotDir);
+
+ IPageManager getPageManager();
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/MockSchemaFile.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/MockSchemaFile.java
index 6dd9491df39..da5b187943e 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/MockSchemaFile.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/MockSchemaFile.java
@@ -26,6 +26,7 @@ import
org.apache.iotdb.commons.schema.node.utils.IMNodeFactory;
import
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.mnode.ICachedMNode;
import
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.mnode.container.CachedMNodeContainer;
import
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.mnode.container.ICachedMNodeContainer;
+import
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.schemafile.pagemgr.IPageManager;
import
org.apache.iotdb.db.schemaengine.schemaregion.mtree.loader.MNodeFactoryLoader;
import java.io.File;
@@ -103,6 +104,11 @@ public class MockSchemaFile implements ISchemaFile {
return false;
}
+ @Override
+ public IPageManager getPageManager() {
+ return null;
+ }
+
@Override
public synchronized void writeMNode(ICachedMNode parent) {
ICachedMNodeContainer container = getCachedMNodeContainer(parent);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/SchemaFile.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/SchemaFile.java
index 1b59a45cf53..91ef7d8ab77 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/SchemaFile.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/SchemaFile.java
@@ -453,6 +453,10 @@ public class SchemaFile implements ISchemaFile {
pageManager.setMetric(metric);
}
+ public IPageManager getPageManager() {
+ return pageManager;
+ }
+
// endregion
// region Snapshot
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/pagemgr/IPageManager.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/pagemgr/IPageManager.java
index bb9baf86fa7..09742639904 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/pagemgr/IPageManager.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/pagemgr/IPageManager.java
@@ -60,4 +60,6 @@ public interface IPageManager {
void inspect(PrintWriter pw) throws IOException, MetadataException;
void setMetric(SchemaRegionCachedMetric metric);
+
+ PagePool getPagePool();
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/pagemgr/PageManager.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/pagemgr/PageManager.java
index ad92a332d57..1b313fd7879 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/pagemgr/PageManager.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/pagemgr/PageManager.java
@@ -626,6 +626,11 @@ public abstract class PageManager implements IPageManager {
cxt.markDirty(page);
}
+ @Override
+ public PagePool getPagePool() {
+ return pagePool;
+ }
+
// endregion
// region Inner Utilities
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/pagemgr/PagePool.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/pagemgr/PagePool.java
index 7cd9d4d5860..af95eb61484 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/pagemgr/PagePool.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/pagemgr/PagePool.java
@@ -149,4 +149,26 @@ public class PagePool {
}
}
}
+
+ /**
+ * Evict cached page from page pool.
+ *
+ * @param page page to be evicted
+ * @return true if evicted successfully, false otherwise
+ */
+ public boolean evict(ISchemaPage page) {
+ // TODO(PBTree-Page-Concurrent): 这里实现 evict cached page 的逻辑,这个接口也可以考虑移到
IPageManager 中
+ return true;
+ }
+
+ /**
+ * Flush volatile page to disk.
+ *
+ * @param page page to be flushed
+ * @return true if flushed successfully, false otherwise
+ */
+ public boolean flush(ISchemaPage page) {
+ // TODO(PBTree-Page-Concurrent): 这里实现 flush volatile page 的逻辑,这个接口也可以考虑移到
IPageManager 中
+ return true;
+ }
}
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/metadata/page/PageLifecycleManagerTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/metadata/page/PageLifecycleManagerTest.java
new file mode 100644
index 00000000000..b629e392c90
--- /dev/null
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/metadata/page/PageLifecycleManagerTest.java
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.metadata.page;
+
+import org.apache.iotdb.db.conf.IoTDBConfig;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.memory.page.PageLifecycleManager;
+import
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.schemafile.ISchemaPage;
+import
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.schemafile.pagemgr.PagePool;
+import org.apache.iotdb.tsfile.utils.Pair;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class PageLifecycleManagerTest {
+
+ private final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
+ private final PageLifecycleManager manager =
PageLifecycleManager.getInstance();
+
+ @Before
+ public void setUp() throws Exception {
+ manager.loadConfiguration(5, 2, 3);
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ manager.clear();
+ manager.loadConfiguration(
+ config.getPbtreeCachePageNum() + config.getPbtreeBufferPageNum(),
+ config.getPbtreeCachePageNum(),
+ config.getPbtreeBufferPageNum());
+ }
+
+ // Test proportion control and fifo strategy
+ @Test
+ public void testFIFOCacheBufferProportion() {
+ PagePool pool1 = mock(PagePool.class);
+ manager.registerPagePool(1, pool1);
+ List<ISchemaPage> pageList = new ArrayList<>();
+ for (int i = 0; i < 10; i++) {
+ ISchemaPage page = mock(ISchemaPage.class);
+ when(page.getRefCnt()).thenReturn(new AtomicInteger(0));
+ when(page.getPageIndex()).thenReturn(i);
+ pageList.add(page);
+ when(pool1.evict(page)).thenReturn(true);
+ when(pool1.flush(page)).thenReturn(true);
+ }
+ for (int i = 0; i < 3; i++) {
+ ISchemaPage page = pageList.get(i);
+ manager.putCachedPage(1, page);
+ }
+ Assert.assertEquals(3, manager.getCachedContainer().size());
+ Assert.assertEquals(0, manager.getVolatileContainer().size());
+ for (int i = 3; i < 5; i++) {
+ ISchemaPage page = pageList.get(i);
+ manager.putVolatilePage(1, page);
+ }
+ Assert.assertEquals(3, manager.getCachedContainer().size());
+ Assert.assertEquals(2, manager.getVolatileContainer().size());
+ for (int i = 5; i < 7; i++) {
+ ISchemaPage page = pageList.get(i);
+ manager.putCachedPage(1, page);
+ }
+ Assert.assertEquals(3, manager.getCachedContainer().size());
+ Assert.assertEquals(2, manager.getVolatileContainer().size());
+ Iterator<Pair<Integer, ISchemaPage>> iterator =
manager.getCachedContainer().iterator();
+ Assert.assertEquals(2, iterator.next().right.getPageIndex());
+ Assert.assertEquals(5, iterator.next().right.getPageIndex());
+ Assert.assertEquals(6, iterator.next().right.getPageIndex());
+ Assert.assertFalse(iterator.hasNext());
+
+ manager.putVolatilePage(1, pageList.get(7));
+ Assert.assertEquals(2, manager.getCachedContainer().size());
+ Assert.assertEquals(3, manager.getVolatileContainer().size());
+ manager.putVolatilePage(1, pageList.get(8));
+ Assert.assertEquals(2, manager.getCachedContainer().size());
+ Assert.assertEquals(3, manager.getVolatileContainer().size());
+ manager.putCachedPage(1, pageList.get(9));
+ Assert.assertEquals(2, manager.getCachedContainer().size());
+ Assert.assertEquals(3, manager.getVolatileContainer().size());
+ iterator = manager.getCachedContainer().iterator();
+ Assert.assertEquals(6, iterator.next().right.getPageIndex());
+ Assert.assertEquals(9, iterator.next().right.getPageIndex());
+ Assert.assertFalse(iterator.hasNext());
+ iterator = manager.getVolatileContainer().iterator();
+ Assert.assertEquals(4, iterator.next().right.getPageIndex());
+ Assert.assertEquals(7, iterator.next().right.getPageIndex());
+ Assert.assertEquals(8, iterator.next().right.getPageIndex());
+ Assert.assertFalse(iterator.hasNext());
+ }
+
+ @Test
+ public void testLockPage() {
+ PagePool pool1 = mock(PagePool.class);
+ PagePool pool2 = mock(PagePool.class);
+ manager.registerPagePool(1, pool1);
+ manager.registerPagePool(2, pool2);
+
+ // [1, 1] is locked volatile page
+ ISchemaPage page = mock(ISchemaPage.class);
+ when(page.getRefCnt()).thenReturn(new AtomicInteger(1));
+ when(page.getPageIndex()).thenReturn(1);
+ when(pool1.flush(page)).thenReturn(false);
+ manager.putVolatilePage(1, page);
+
+ // other page is not locked
+ for (int i = 0; i < 2; i++) {
+ page = mock(ISchemaPage.class);
+ when(page.getRefCnt()).thenReturn(new AtomicInteger(0));
+ when(page.getPageIndex()).thenReturn(2 + i);
+ when(pool2.flush(page)).thenReturn(true);
+ manager.putVolatilePage(2, page);
+ }
+ for (int i = 0; i < 2; i++) {
+ page = mock(ISchemaPage.class);
+ when(page.getRefCnt()).thenReturn(new AtomicInteger(0));
+ when(page.getPageIndex()).thenReturn(4 + i);
+ when(pool1.evict(page)).thenReturn(true);
+ manager.putCachedPage(1, page);
+ }
+
+ // check current status
+ Iterator<Pair<Integer, ISchemaPage>> iterator =
manager.getVolatileContainer().iterator();
+ int idx = 1;
+ while (iterator.hasNext()) {
+ Pair<Integer, ISchemaPage> pair = iterator.next();
+ Assert.assertEquals((idx == 2 || idx == 3) ? 2 : 1,
pair.left.intValue());
+ Assert.assertEquals(idx++, pair.right.getPageIndex());
+ }
+ iterator = manager.getCachedContainer().iterator();
+ while (iterator.hasNext()) {
+ Pair<Integer, ISchemaPage> pair = iterator.next();
+ Assert.assertEquals(1, pair.left.intValue());
+ Assert.assertEquals(idx++, pair.right.getPageIndex());
+ }
+
+ // add a new volatile page, the locked page should be evicted
+ page = mock(ISchemaPage.class);
+ when(page.getRefCnt()).thenReturn(new AtomicInteger(0));
+ when(page.getPageIndex()).thenReturn(6);
+ when(pool1.flush(page)).thenReturn(true);
+ manager.putVolatilePage(1, page);
+ iterator = manager.getVolatileContainer().iterator();
+ Assert.assertEquals(1, iterator.next().right.getPageIndex());
+ Assert.assertEquals(3, iterator.next().right.getPageIndex());
+ Assert.assertEquals(6, iterator.next().right.getPageIndex());
+ Assert.assertFalse(iterator.hasNext());
+ }
+}