This is an automated email from the ASF dual-hosted git repository.

zyk pushed a commit to branch pbtree_concurrent
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/pbtree_concurrent by this push:
     new dd02cb89fee PBTree Improvement: concurrent operation and interleaved 
flush (#11698)
dd02cb89fee is described below

commit dd02cb89fee1f868bc04f8aabbd28c91b91efe26
Author: ZhaoXin <[email protected]>
AuthorDate: Fri Dec 15 09:30:19 2023 +0800

    PBTree Improvement: concurrent operation and interleaved flush (#11698)
---
 .../mtree/impl/pbtree/CachedMTreeStore.java        |   4 +-
 .../mtree/impl/pbtree/schemafile/ISchemaPage.java  |  35 +-
 .../impl/pbtree/schemafile/ISegmentedPage.java     |   2 +-
 .../mtree/impl/pbtree/schemafile/InternalPage.java |   9 +
 .../mtree/impl/pbtree/schemafile/SchemaFile.java   |   9 +-
 .../mtree/impl/pbtree/schemafile/SchemaPage.java   |  47 +-
 .../impl/pbtree/schemafile/SegmentedPage.java      |  16 +-
 .../schemafile/pagemgr/BTreePageManager.java       | 379 ++++++----
 .../pbtree/schemafile/pagemgr/IPageManager.java    |   9 +-
 .../pbtree/schemafile/pagemgr/PageManager.java     | 821 ++++++++++++++-------
 .../mtree/schemafile/SchemaFileLogTest.java        |   4 +-
 .../metadata/mtree/schemafile/SchemaFileTest.java  |   1 -
 12 files changed, 884 insertions(+), 452 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/CachedMTreeStore.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/CachedMTreeStore.java
index d6ec24cb118..76aafffd435 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/CachedMTreeStore.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/CachedMTreeStore.java
@@ -40,7 +40,6 @@ import 
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.mnode.ICa
 import 
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.mnode.container.ICachedMNodeContainer;
 import 
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.mnode.iterator.CachedTraverserIterator;
 import 
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.schemafile.ISchemaFile;
-import 
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.schemafile.MockSchemaFile;
 import 
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.schemafile.SchemaFile;
 import 
org.apache.iotdb.db.schemaengine.schemaregion.mtree.loader.MNodeFactoryLoader;
 import org.apache.iotdb.db.schemaengine.schemaregion.utils.MNodeUtils;
@@ -80,8 +79,7 @@ public class CachedMTreeStore implements 
IMTreeStore<ICachedMNode> {
       Runnable flushCallback)
       throws MetadataException, IOException {
     this.schemaRegionId = schemaRegionId;
-    //    file = SchemaFile.initSchemaFile(storageGroup.getFullPath(), 
schemaRegionId);
-    file = new MockSchemaFile(storageGroup);
+    file = SchemaFile.initSchemaFile(storageGroup.getFullPath(), 
schemaRegionId);
     root = file.init();
     this.regionStatistics = regionStatistics;
     this.memManager = new MemManager(regionStatistics);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/ISchemaPage.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/ISchemaPage.java
index 9a6b543734f..305c30fb94b 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/ISchemaPage.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/ISchemaPage.java
@@ -27,6 +27,9 @@ import java.io.IOException;
 import java.io.OutputStream;
 import java.nio.ByteBuffer;
 import java.nio.channels.FileChannel;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 public interface ISchemaPage {
   /**
@@ -62,7 +65,8 @@ public interface ISchemaPage {
   }
 
   /** InternalPage should be initiated with a pointer which points to the 
minimal child of it. */
-  static SchemaPage initInternalPage(ByteBuffer buffer, int pageIndex, int 
ptr) {
+  static SchemaPage initInternalPage(
+      ByteBuffer buffer, int pageIndex, int ptr, AtomicInteger ai, 
ReadWriteLock rwl) {
     buffer.clear();
 
     buffer.position(SchemaFileConfig.PAGE_HEADER_SIZE);
@@ -84,10 +88,16 @@ public interface ISchemaPage {
     ReadWriteIOUtils.write(-1L, buffer);
     ReadWriteIOUtils.write(-1, buffer);
 
-    return new InternalPage(buffer);
+    return new InternalPage(buffer, ai, rwl);
   }
 
-  static ISegmentedPage initSegmentedPage(ByteBuffer buffer, int pageIndex) {
+  static SchemaPage initInternalPage(ByteBuffer buffer, int pageIndex, int 
ptr) {
+    return initInternalPage(
+        buffer, pageIndex, ptr, new AtomicInteger(), new 
ReentrantReadWriteLock());
+  }
+
+  static ISegmentedPage initSegmentedPage(
+      ByteBuffer buffer, int pageIndex, AtomicInteger ai, ReadWriteLock rwl) {
     buffer.clear();
     ReadWriteIOUtils.write(SchemaFileConfig.SEGMENTED_PAGE, buffer);
     ReadWriteIOUtils.write(pageIndex, buffer);
@@ -95,7 +105,12 @@ public interface ISchemaPage {
     ReadWriteIOUtils.write((short) (buffer.capacity() - 
SchemaFileConfig.PAGE_HEADER_SIZE), buffer);
     ReadWriteIOUtils.write((short) 0, buffer);
     ReadWriteIOUtils.write(-1L, buffer);
-    return new SegmentedPage(buffer);
+    return new SegmentedPage(buffer, ai, rwl);
+  }
+
+  static ISegmentedPage initSegmentedPage(ByteBuffer buffer, int pageIndex) {
+    return ISchemaPage.initSegmentedPage(
+        buffer, pageIndex, new AtomicInteger(), new ReentrantReadWriteLock());
   }
 
   static SchemaPage initAliasIndexPage(ByteBuffer buffer, int pageIndex) {
@@ -109,10 +124,18 @@ public interface ISchemaPage {
     return new AliasIndexPage(buffer);
   }
 
+  AtomicInteger getRefCnt();
+
+  ReadWriteLock getLock();
+
   void syncPageBuffer();
 
   void flushPageToChannel(FileChannel channel) throws IOException;
 
+  boolean isDirtyPage();
+
+  void setDirtyFlag();
+
   void flushPageToStream(OutputStream stream) throws IOException;
 
   String inspect() throws SegmentNotFoundException;
@@ -133,10 +156,6 @@ public interface ISchemaPage {
 
   ByteBuffer getEntireSegmentSlice() throws MetadataException;
 
-  void markDirty();
-
-  boolean isDirty();
-
   @TestOnly
   WrappedSegment getSegmentOnTest(short idx) throws SegmentNotFoundException;
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/ISegmentedPage.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/ISegmentedPage.java
index 5d69f37dd6f..3e0f44db0e1 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/ISegmentedPage.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/ISegmentedPage.java
@@ -80,7 +80,7 @@ public interface ISegmentedPage extends ISchemaPage {
    * Transplant designated segment from srcPage, to spare space of the page
    *
    * @param srcPage source page conveys source segment
-   * @param segId id of the target segment
+   * @param segId id of the designated segment from the srcPage
    * @param newSegSize size of new segment in this page
    * @throws MetadataException if spare not enough, segment not found or 
inconsistency
    */
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/InternalPage.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/InternalPage.java
index 724e97f67bf..f1c2d252f84 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/InternalPage.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/InternalPage.java
@@ -26,6 +26,8 @@ import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
 import java.nio.ByteBuffer;
 import java.util.ArrayDeque;
 import java.util.Queue;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.ReadWriteLock;
 
 /**
  * A page which acts like a segment, manages index entry of the b+ tree 
constructed by MNode with
@@ -67,6 +69,13 @@ public class InternalPage extends SchemaPage implements 
ISegment<Integer, Intege
     subIndexPage = ReadWriteIOUtils.readInt(pageBuffer);
   }
 
+  /** compatible constructor for replacement */
+  public InternalPage(ByteBuffer pageBuffer, AtomicInteger ai, ReadWriteLock 
rwl) {
+    super(pageBuffer, ai, rwl);
+    firstLeaf = ReadWriteIOUtils.readLong(pageBuffer);
+    subIndexPage = ReadWriteIOUtils.readInt(pageBuffer);
+  }
+
   @Override
   public int insertRecord(String key, Integer pointer) throws 
RecordDuplicatedException {
     // TODO: remove debug parameter INTERNAL_SPLIT_VALVE
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/SchemaFile.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/SchemaFile.java
index c703e641345..b8ac83ff915 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/SchemaFile.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/SchemaFile.java
@@ -243,10 +243,7 @@ public class SchemaFile implements ISchemaFile {
                 node.getFullPath()));
       }
     }
-
-    pageManager.writeNewChildren(node);
-    pageManager.writeUpdatedChildren(node);
-    pageManager.flushDirtyPages();
+    pageManager.writeMNode(node);
     updateHeaderBuffer();
   }
 
@@ -270,7 +267,6 @@ public class SchemaFile implements ISchemaFile {
   @Override
   public void close() throws IOException {
     updateHeaderBuffer();
-    pageManager.flushDirtyPages();
     pageManager.close();
     forceChannel();
     channel.close();
@@ -279,7 +275,6 @@ public class SchemaFile implements ISchemaFile {
   @Override
   public void sync() throws IOException {
     updateHeaderBuffer();
-    pageManager.flushDirtyPages();
     forceChannel();
   }
 
@@ -414,7 +409,7 @@ public class SchemaFile implements ISchemaFile {
     return (short) (globalIndex & SchemaFileConfig.SEG_INDEX_MASK);
   }
 
-  /** TODO: shall merge with {@linkplain PageManager#reEstimateSegSize} */
+  /** TODO: shall merge with PageManager#reEstimateSegSize */
   static short reEstimateSegSize(int oldSize) {
     for (short size : SchemaFileConfig.SEG_SIZE_LST) {
       if (oldSize < size) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/SchemaPage.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/SchemaPage.java
index d72c80b6da3..d10b8c23fcf 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/SchemaPage.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/SchemaPage.java
@@ -27,6 +27,9 @@ import java.io.IOException;
 import java.io.OutputStream;
 import java.nio.ByteBuffer;
 import java.nio.channels.FileChannel;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 /**
  * {@link SchemaFile} manages a collection of SchemaPages, which acts like a 
segment of index entry
@@ -37,6 +40,9 @@ public abstract class SchemaPage implements ISchemaPage {
   // All other attributes are to describe this ByteBuffer
   protected final ByteBuffer pageBuffer;
 
+  protected final AtomicInteger refCnt;
+  protected final ReadWriteLock lock;
+
   protected int pageIndex; // only change when register buffer as a new page
   protected short spareOffset; // bound of the variable length part in a 
slotted structure
   protected short spareSize; // traces spare space size simultaneously
@@ -44,19 +50,38 @@ public abstract class SchemaPage implements ISchemaPage {
 
   protected boolean dirtyFlag = false; // any modification turns it true
 
-  protected SchemaPage(ByteBuffer pageBuffer) {
+  /** for replacement page state transfer * */
+  protected SchemaPage(ByteBuffer pageBuffer, AtomicInteger ai, ReadWriteLock 
rwl) {
     this.pageBuffer = pageBuffer;
 
     this.pageBuffer
         .limit(this.pageBuffer.capacity())
         .position(SchemaFileConfig.PAGE_HEADER_INDEX_OFFSET);
 
+    refCnt = ai;
+    lock = rwl;
+
     pageIndex = ReadWriteIOUtils.readInt(this.pageBuffer);
     spareOffset = ReadWriteIOUtils.readShort(this.pageBuffer);
     spareSize = ReadWriteIOUtils.readShort(this.pageBuffer);
     memberNum = ReadWriteIOUtils.readShort(this.pageBuffer);
   }
 
+  /** compatible constructor with no existing ref or lock */
+  protected SchemaPage(ByteBuffer pageBuffer) {
+    this(pageBuffer, new AtomicInteger(), new ReentrantReadWriteLock());
+  }
+
+  @Override
+  public AtomicInteger getRefCnt() {
+    return refCnt;
+  }
+
+  @Override
+  public ReadWriteLock getLock() {
+    return lock;
+  }
+
   @Override
   public void syncPageBuffer() {
     this.pageBuffer.limit(this.pageBuffer.capacity());
@@ -74,6 +99,16 @@ public abstract class SchemaPage implements ISchemaPage {
     dirtyFlag = false;
   }
 
+  @Override
+  public boolean isDirtyPage() {
+    return dirtyFlag;
+  }
+
+  @Override
+  public void setDirtyFlag() {
+    this.dirtyFlag = true;
+  }
+
   @Override
   public void flushPageToStream(OutputStream stream) throws IOException {
     if (pageIndex < 0) {
@@ -129,16 +164,6 @@ public abstract class SchemaPage implements ISchemaPage {
     return null;
   }
 
-  @Override
-  public void markDirty() {
-    dirtyFlag = true;
-  }
-
-  @Override
-  public boolean isDirty() {
-    return dirtyFlag;
-  }
-
   @Override
   @TestOnly
   public WrappedSegment getSegmentOnTest(short idx) throws 
SegmentNotFoundException {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/SegmentedPage.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/SegmentedPage.java
index 0082f7d3665..b5de61e9da6 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/SegmentedPage.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/SegmentedPage.java
@@ -33,6 +33,9 @@ import java.util.List;
 import java.util.Map;
 import java.util.Queue;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 public class SegmentedPage extends SchemaPage implements ISegmentedPage {
 
@@ -67,8 +70,19 @@ public class SegmentedPage extends SchemaPage implements 
ISegmentedPage {
    *       #deleteSegment} mentioned.
    * </ul>
    */
+  public SegmentedPage(ByteBuffer pageBuffer, AtomicInteger ai, ReadWriteLock 
rwl) {
+    super(pageBuffer, ai, rwl);
+    segCacheMap = new ConcurrentHashMap<>();
+    segOffsetLst = new ArrayList<>();
+
+    pageBuffer.position(pageBuffer.capacity() - SchemaFileConfig.SEG_OFF_DIG * 
memberNum);
+    for (int idx = 0; idx < memberNum; idx++) {
+      segOffsetLst.add(ReadWriteIOUtils.readShort(pageBuffer));
+    }
+  }
+
   public SegmentedPage(ByteBuffer pageBuffer) {
-    super(pageBuffer);
+    super(pageBuffer, new AtomicInteger(), new ReentrantReadWriteLock());
     segCacheMap = new ConcurrentHashMap<>();
     segOffsetLst = new ArrayList<>();
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/pagemgr/BTreePageManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/pagemgr/BTreePageManager.java
index e80c4dfb1dd..7c0bfd6f66d 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/pagemgr/BTreePageManager.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/pagemgr/BTreePageManager.java
@@ -49,10 +49,10 @@ public class BTreePageManager extends PageManager {
 
   @Override
   protected void multiPageInsertOverflowOperation(
-      ISchemaPage curPage, String key, ByteBuffer childBuffer)
+      ISchemaPage curPage, String key, ByteBuffer childBuffer, 
SchemaPageContext cxt)
       throws MetadataException, IOException {
     // cur is a leaf, split and set next ptr as link between
-    ISegmentedPage newPage = 
getMinApplSegmentedPageInMem(SchemaFileConfig.SEG_MAX_SIZ);
+    ISegmentedPage newPage = 
getMinApplSegmentedPageInMem(SchemaFileConfig.SEG_MAX_SIZ, cxt);
     newPage.allocNewSegment(SchemaFileConfig.SEG_MAX_SIZ);
     String sk =
         curPage
@@ -61,16 +61,16 @@ public class BTreePageManager extends PageManager {
     curPage
         .getAsSegmentedPage()
         .setNextSegAddress((short) 0, getGlobalIndex(newPage.getPageIndex(), 
(short) 0));
-    markDirty(curPage);
-    insertIndexEntryEntrance(curPage, newPage, sk);
+    cxt.markDirty(curPage);
+    insertIndexEntryEntrance(curPage, newPage, sk, cxt);
   }
 
   @Override
   protected void multiPageUpdateOverflowOperation(
-      ISchemaPage curPage, String key, ByteBuffer childBuffer)
+      ISchemaPage curPage, String key, ByteBuffer childBuffer, 
SchemaPageContext cxt)
       throws MetadataException, IOException {
-    // split and update higer nodes
-    ISegmentedPage splPage = 
getMinApplSegmentedPageInMem(SchemaFileConfig.SEG_MAX_SIZ);
+    // even split and update higher nodes
+    ISegmentedPage splPage = 
getMinApplSegmentedPageInMem(SchemaFileConfig.SEG_MAX_SIZ, cxt);
     splPage.allocNewSegment(SchemaFileConfig.SEG_MAX_SIZ);
     String sk = curPage.getAsSegmentedPage().splitWrappedSegment(null, null, 
splPage, false);
     curPage
@@ -85,7 +85,7 @@ public class BTreePageManager extends PageManager {
     }
 
     // insert index entry upward
-    insertIndexEntryEntrance(curPage, splPage, sk);
+    insertIndexEntryEntrance(curPage, splPage, sk, cxt);
   }
 
   /**
@@ -96,8 +96,9 @@ public class BTreePageManager extends PageManager {
    * @throws IOException
    */
   @Override
-  protected void buildSubIndex(ICachedMNode parNode) throws MetadataException, 
IOException {
-    ISchemaPage cursorPage = 
getPageInstance(getPageIndex(getNodeAddress(parNode)));
+  protected void buildSubIndex(ICachedMNode parNode, SchemaPageContext cxt)
+      throws MetadataException, IOException {
+    ISchemaPage cursorPage = 
getPageInstance(getPageIndex(getNodeAddress(parNode)), cxt);
 
     if (cursorPage.getAsInternalPage() == null) {
       throw new MetadataException("Subordinate index shall not build upon 
single page segment.");
@@ -106,12 +107,12 @@ public class BTreePageManager extends PageManager {
     ISchemaPage tPage = cursorPage; // reserve the top page to modify subIndex
     ISchemaPage subIndexPage =
         
ISchemaPage.initAliasIndexPage(ByteBuffer.allocate(SchemaFileConfig.PAGE_LENGTH),
 -1);
-    registerAsNewPage(subIndexPage);
+    registerAsNewPage(subIndexPage, cxt);
 
     // transfer cursorPage to leaf page
     while (cursorPage.getAsInternalPage() != null) {
       cursorPage =
-          
getPageInstance(getPageIndex(cursorPage.getAsInternalPage().getNextSegAddress()));
+          
getPageInstance(getPageIndex(cursorPage.getAsInternalPage().getNextSegAddress()),
 cxt);
     }
 
     long nextAddr = cursorPage.getAsSegmentedPage().getNextSegAddress((short) 
0);
@@ -120,7 +121,7 @@ public class BTreePageManager extends PageManager {
     // TODO: inefficient to build B+Tree up-to-bottom, improve further
     while (!children.isEmpty() || nextAddr != -1L) {
       if (children.isEmpty()) {
-        cursorPage = getPageInstance(getPageIndex(nextAddr));
+        cursorPage = getPageInstance(getPageIndex(nextAddr), cxt);
         nextAddr = cursorPage.getAsSegmentedPage().getNextSegAddress((short) 
0);
         children = cursorPage.getAsSegmentedPage().getChildren((short) 0);
       }
@@ -130,7 +131,7 @@ public class BTreePageManager extends PageManager {
           && child.getAsMeasurementMNode().getAlias() != null) {
         subIndexPage =
             insertAliasIndexEntry(
-                subIndexPage, child.getAsMeasurementMNode().getAlias(), 
child.getName());
+                subIndexPage, child.getAsMeasurementMNode().getAlias(), 
child.getName(), cxt);
       }
     }
 
@@ -138,30 +139,31 @@ public class BTreePageManager extends PageManager {
   }
 
   @Override
-  protected void insertSubIndexEntry(int base, String key, String rec)
+  protected void insertSubIndexEntry(int base, String key, String rec, 
SchemaPageContext cxt)
       throws MetadataException, IOException {
-    insertAliasIndexEntry(getPageInstance(base), key, rec);
+    insertAliasIndexEntry(getPageInstance(base, cxt), key, rec, cxt);
   }
 
   @Override
-  protected void removeSubIndexEntry(int base, String oldAlias)
+  protected void removeSubIndexEntry(int base, String oldAlias, 
SchemaPageContext cxt)
       throws MetadataException, IOException {
-    ISchemaPage tarPage = getTargetLeafPage(getPageInstance(base), oldAlias);
+    ISchemaPage tarPage = getTargetLeafPage(getPageInstance(base, cxt), 
oldAlias, cxt);
     tarPage.getAsAliasIndexPage().removeRecord(oldAlias);
   }
 
   @Override
-  protected String searchSubIndexAlias(int base, String alias)
+  protected String searchSubIndexAlias(int base, String alias, 
SchemaPageContext cxt)
       throws MetadataException, IOException {
-    return getTargetLeafPage(getPageInstance(base), alias)
+    return getTargetLeafPage(getPageInstance(base, cxt), alias, cxt)
         .getAsAliasIndexPage()
         .getRecordByAlias(alias);
   }
 
   /** @return top page to insert index */
-  private ISchemaPage insertAliasIndexEntry(ISchemaPage topPage, String alias, 
String name)
+  private ISchemaPage insertAliasIndexEntry(
+      ISchemaPage topPage, String alias, String name, SchemaPageContext cxt)
       throws MetadataException, IOException {
-    ISchemaPage tarPage = getTargetLeafPage(topPage, alias);
+    ISchemaPage tarPage = getTargetLeafPage(topPage, alias, cxt);
     if (tarPage.getAsAliasIndexPage() == null) {
       throw new MetadataException("File may be corrupted that subordinate 
index has broken.");
     }
@@ -174,23 +176,25 @@ public class BTreePageManager extends PageManager {
               .getAsAliasIndexPage()
               .splitByKey(alias, name, spltBuf, 
SchemaFileConfig.INCLINED_SPLIT);
       ISchemaPage splPage = ISchemaPage.loadSchemaPage(spltBuf);
-      registerAsNewPage(splPage);
+      registerAsNewPage(splPage, cxt);
 
-      if (treeTrace[0] < 1) {
-        // From single sub-index page to tree structure
+      if (cxt.treeTrace[0] < 1) {
+        // Transfer single sub-index page to tree structure
         ByteBuffer trsBuf = ByteBuffer.allocate(SchemaFileConfig.PAGE_LENGTH);
         tarPage.getAsAliasIndexPage().extendsTo(trsBuf);
         ISchemaPage trsPage = ISchemaPage.loadSchemaPage(trsBuf);
 
         // Notice that index of tarPage belongs to repPage then
-        registerAsNewPage(trsPage);
+        registerAsNewPage(trsPage, cxt);
 
         // tarPage abolished since then
         ISchemaPage repPage =
             ISchemaPage.initInternalPage(
                 ByteBuffer.allocate(SchemaFileConfig.PAGE_LENGTH),
                 tarPage.getPageIndex(),
-                trsPage.getPageIndex());
+                trsPage.getPageIndex(),
+                tarPage.getRefCnt(),
+                tarPage.getLock());
 
         if (0 > repPage.getAsInternalPage().insertRecord(sk, 
splPage.getPageIndex())) {
           throw new ColossalRecordException(sk, alias);
@@ -200,11 +204,12 @@ public class BTreePageManager extends PageManager {
             .getAsInternalPage()
             .setNextSegAddress(getGlobalIndex(trsPage.getPageIndex(), (short) 
0));
 
-        replacePageInCache(repPage);
+        replacePageInCache(repPage, cxt);
         return repPage;
       } else {
-        insertIndexEntryRecursiveUpwards(treeTrace[0], sk, 
splPage.getPageIndex());
-        return getPageInstance(treeTrace[1]);
+        // no interleaved flush implemented since alias of the incoming 
records are NOT ordered
+        insertIndexEntryRecursiveUpwards(cxt.treeTrace[0], sk, 
splPage.getPageIndex(), cxt);
+        return getPageInstance(cxt.treeTrace[1], cxt);
       }
     }
     return topPage;
@@ -217,28 +222,43 @@ public class BTreePageManager extends PageManager {
    * @param splPage new page for original page to split.
    * @param sk least key of splPage.
    */
-  private void insertIndexEntryEntrance(ISchemaPage curPage, ISchemaPage 
splPage, String sk)
+  private void insertIndexEntryEntrance(
+      ISchemaPage curPage, ISchemaPage splPage, String sk, SchemaPageContext 
cxt)
       throws MetadataException, IOException {
-    if (treeTrace[0] < 1) {
-      ISegmentedPage trsPage = 
getMinApplSegmentedPageInMem(SchemaFileConfig.SEG_MAX_SIZ);
+    if (cxt.treeTrace[0] < 1) {
+      // To make parent pointer valid after the btree established, curPage 
need to be transformed
+      //  into an InternalPage. Since NOW page CANNOT transform in place, a 
substitute InternalPage
+      //  inherits index from curPage initiated.
+      ISegmentedPage trsPage = 
getMinApplSegmentedPageInMem(SchemaFileConfig.SEG_MAX_SIZ, cxt);
       trsPage.transplantSegment(
           curPage.getAsSegmentedPage(), (short) 0, 
SchemaFileConfig.SEG_MAX_SIZ);
+
+      // repPage inherits lock and referent from curPage, which is always 
locked by entrant.
       ISchemaPage repPage =
           ISchemaPage.initInternalPage(
               ByteBuffer.allocate(SchemaFileConfig.PAGE_LENGTH),
               curPage.getPageIndex(),
-              trsPage.getPageIndex());
+              trsPage.getPageIndex(),
+              curPage.getRefCnt(),
+              curPage.getLock());
 
+      // link right child of the initiated InternalPage
       if (0 > repPage.getAsInternalPage().insertRecord(sk, 
splPage.getPageIndex())) {
         throw new ColossalRecordException(sk);
       }
 
+      // setNextSegment of root as the left-most leaf
       repPage
           .getAsInternalPage()
           .setNextSegAddress(getGlobalIndex(trsPage.getPageIndex(), (short) 
0));
-      replacePageInCache(repPage);
+
+      // mark the left-most child for interleaved flush, given the write 
operation is ordered
+      cxt.invokeLastLeaf(trsPage);
+      replacePageInCache(repPage, cxt);
     } else {
-      insertIndexEntryRecursiveUpwards(treeTrace[0], sk, 
splPage.getPageIndex());
+      // if the write starts from non-left-most leaf, it shall be recorded as 
well
+      cxt.invokeLastLeaf(curPage);
+      insertIndexEntryRecursiveUpwards(cxt.treeTrace[0], sk, 
splPage.getPageIndex(), cxt);
     }
   }
 
@@ -246,12 +266,14 @@ public class BTreePageManager extends PageManager {
    * Insert an index entry into an internal page. Cascade insert or internal 
split conducted if
    * necessary.
    *
+   * @param treeTraceIndex position of page index which had been traced
    * @param key key of the entry
    * @param ptr pointer of the entry
    */
-  private void insertIndexEntryRecursiveUpwards(int treeTraceIndex, String 
key, int ptr)
+  private void insertIndexEntryRecursiveUpwards(
+      int treeTraceIndex, String key, int ptr, SchemaPageContext cxt)
       throws MetadataException, IOException {
-    ISchemaPage idxPage = getPageInstance(treeTrace[treeTraceIndex]);
+    ISchemaPage idxPage = getPageInstance(cxt.treeTrace[treeTraceIndex], cxt);
     if (idxPage.getAsInternalPage().insertRecord(key, ptr) < 0) {
       // handle when insert an index entry occurring an overflow
       if (treeTraceIndex > 1) {
@@ -262,14 +284,14 @@ public class BTreePageManager extends PageManager {
                 .getAsInternalPage()
                 .splitByKey(key, ptr, dstBuffer, 
SchemaFileConfig.INCLINED_SPLIT);
         ISchemaPage dstPage = ISchemaPage.loadSchemaPage(dstBuffer);
-        registerAsNewPage(dstPage);
-        insertIndexEntryRecursiveUpwards(treeTraceIndex - 1, splitKey, 
dstPage.getPageIndex());
+        registerAsNewPage(dstPage, cxt);
+        insertIndexEntryRecursiveUpwards(treeTraceIndex - 1, splitKey, 
dstPage.getPageIndex(), cxt);
       } else {
         // treeTraceIndex==1, idxPage is the root of B+Tree, to split for new 
root internal
         ByteBuffer splBuffer = 
ByteBuffer.allocate(SchemaFileConfig.PAGE_LENGTH);
         ByteBuffer trsBuffer = 
ByteBuffer.allocate(SchemaFileConfig.PAGE_LENGTH);
 
-        // idxPage shall be split, and reserved as root of B+Tree
+        // idxPage splits and transplants, and remains to be root of B+Tree
         String splitKey =
             idxPage
                 .getAsInternalPage()
@@ -277,8 +299,8 @@ public class BTreePageManager extends PageManager {
         idxPage.getAsInternalPage().extendsTo(trsBuffer);
         ISchemaPage splPage = ISchemaPage.loadSchemaPage(splBuffer);
         ISchemaPage trsPage = ISchemaPage.loadSchemaPage(trsBuffer);
-        registerAsNewPage(splPage);
-        registerAsNewPage(trsPage);
+        registerAsNewPage(splPage, cxt);
+        registerAsNewPage(trsPage, cxt);
 
         idxPage.getAsInternalPage().resetBuffer(trsPage.getPageIndex());
         if (idxPage.getAsInternalPage().insertRecord(splitKey, 
splPage.getPageIndex()) < 0) {
@@ -289,173 +311,210 @@ public class BTreePageManager extends PageManager {
             
.setNextSegAddress(trsPage.getAsInternalPage().getNextSegAddress());
       }
     }
-    markDirty(idxPage);
-    addPageToCache(idxPage.getPageIndex(), idxPage);
+    cxt.markDirty(idxPage);
   }
 
   @Override
   public void delete(ICachedMNode node) throws IOException, MetadataException {
-    // remove corresponding record
-    long recSegAddr = getNodeAddress(node.getParent());
-    recSegAddr = getTargetSegmentAddress(recSegAddr, node.getName());
-    ISchemaPage tarPage = getPageInstance(getPageIndex(recSegAddr));
-    markDirty(tarPage);
-    tarPage.getAsSegmentedPage().removeRecord(getSegIndex(recSegAddr), 
node.getName());
-
-    // remove segments belongs to node
-    if (!node.isMeasurement() && getNodeAddress(node) > 0) {
-      // node with maliciously modified address may result in orphan pages
-      long delSegAddr = getNodeAddress(node);
-      tarPage = getPageInstance(getPageIndex(delSegAddr));
-
-      if (tarPage.getAsSegmentedPage() != null) {
-        // TODO: may produce fractured page
-        markDirty(tarPage);
-        tarPage.getAsSegmentedPage().deleteSegment(getSegIndex(delSegAddr));
-        if (tarPage.getAsSegmentedPage().validSegments() == 0) {
-          tarPage.getAsSegmentedPage().purgeSegments();
-        }
-      }
-
-      if (tarPage.getAsInternalPage() != null) {
-        // If the deleted one points to an Internal (root of BTree), there are 
two BTrees to handle:
-        //  one mapping node names to record buffers, and another mapping 
aliases to names. </br>
-        // All of those are turned into SegmentedPage.
-        Deque<Integer> cascadePages = new 
ArrayDeque<>(tarPage.getAsInternalPage().getAllRecords());
-        cascadePages.add(tarPage.getPageIndex());
-
-        if (tarPage.getSubIndex() >= 0) {
-          cascadePages.add(tarPage.getSubIndex());
-        }
-
-        while (!cascadePages.isEmpty()) {
-          if (dirtyPages.size() > SchemaFileConfig.PAGE_CACHE_SIZE) {
-            flushDirtyPages();
-          }
-
-          tarPage = getPageInstance(cascadePages.poll());
-          if (tarPage.getAsSegmentedPage() != null) {
+    cacheGuardian();
+    SchemaPageContext cxt = new SchemaPageContext();
+    // node is the record deleted from its segment
+    entrantLock(node.getParent(), cxt);
+    try {
+      // remove corresponding record
+      long recSegAddr = getNodeAddress(node.getParent());
+      recSegAddr = getTargetSegmentAddress(recSegAddr, node.getName(), cxt);
+      ISchemaPage tarPage = getPageInstance(getPageIndex(recSegAddr), cxt);
+      cxt.markDirty(tarPage);
+      tarPage.getAsSegmentedPage().removeRecord(getSegIndex(recSegAddr), 
node.getName());
+
+      // remove segments belongs to node
+      if (!node.isMeasurement() && getNodeAddress(node) > 0) {
+        // node with maliciously modified address may result in orphan pages
+        long delSegAddr = getNodeAddress(node);
+        tarPage = getPageInstance(getPageIndex(delSegAddr), cxt);
+
+        if (tarPage.getAsSegmentedPage() != null) {
+          // TODO: may produce fractured page
+          cxt.markDirty(tarPage);
+          tarPage.getAsSegmentedPage().deleteSegment(getSegIndex(delSegAddr));
+          if (tarPage.getAsSegmentedPage().validSegments() == 0) {
             tarPage.getAsSegmentedPage().purgeSegments();
-            markDirty(tarPage);
-            addPageToCache(tarPage.getPageIndex(), tarPage);
-            continue;
+            cxt.indexBuckets.sortIntoBucket(tarPage, (short) -1);
           }
+        }
 
-          if (tarPage.getAsInternalPage() != null) {
-            cascadePages.addAll(tarPage.getAsInternalPage().getAllRecords());
+        if (tarPage.getAsInternalPage() != null) {
+          // If the deleted one points to an Internal (root of BTree),
+          //  there are two BTrees to handle:
+          //  one mapping node names to record buffers,
+          //  and another mapping aliases to names. <br>
+          // All of those are turned into SegmentedPage.
+          Deque<Integer> cascadePages =
+              new ArrayDeque<>(tarPage.getAsInternalPage().getAllRecords());
+          cascadePages.add(tarPage.getPageIndex());
+
+          if (tarPage.getSubIndex() >= 0) {
+            cascadePages.add(tarPage.getSubIndex());
           }
 
-          tarPage =
-              ISchemaPage.initSegmentedPage(
-                  ByteBuffer.allocate(SchemaFileConfig.PAGE_LENGTH), 
tarPage.getPageIndex());
-          replacePageInCache(tarPage);
+          while (!cascadePages.isEmpty()) {
+            tarPage = getPageInstance(cascadePages.poll(), cxt);
+            if (tarPage.getAsSegmentedPage() != null) {
+              tarPage.getAsSegmentedPage().purgeSegments();
+              cxt.markDirty(tarPage);
+              cxt.indexBuckets.sortIntoBucket(tarPage, (short) -1);
+              continue;
+            }
+
+            if (tarPage.getAsInternalPage() != null) {
+              cascadePages.addAll(tarPage.getAsInternalPage().getAllRecords());
+            }
+
+            tarPage =
+                ISchemaPage.initSegmentedPage(
+                    ByteBuffer.allocate(SchemaFileConfig.PAGE_LENGTH),
+                    tarPage.getPageIndex(),
+                    tarPage.getRefCnt(),
+                    tarPage.getLock());
+            replacePageInCache(tarPage, cxt);
+          }
         }
       }
+      flushDirtyPages(cxt);
+    } finally {
+      releaseLocks(cxt);
+      releaseReferent(cxt);
     }
-    flushDirtyPages();
   }
 
   @Override
   public ICachedMNode getChildNode(ICachedMNode parent, String childName)
       throws MetadataException, IOException {
+    // TODO unnecessary context var
+    SchemaPageContext cxt = new SchemaPageContext();
+
     if (getNodeAddress(parent) < 0) {
       throw new MetadataException(
           String.format(
               "Node [%s] has no valid segment address in pbtree file.", 
parent.getFullPath()));
     }
 
-    long actualSegAddr = getTargetSegmentAddress(getNodeAddress(parent), 
childName);
-    ICachedMNode child =
-        getPageInstance(getPageIndex(actualSegAddr))
-            .getAsSegmentedPage()
-            .read(getSegIndex(actualSegAddr), childName);
-
-    if (child == null && parent.isDevice()) {
-      // try read alias directly first
-      child =
-          getPageInstance(getPageIndex(actualSegAddr))
+    // a single read lock on initial page is sufficient to mutex write, no 
need to trace it
+    int initIndex = getPageIndex(getNodeAddress(parent));
+    getPageInstance(initIndex, cxt).getLock().readLock().lock();
+    try {
+      long actualSegAddr = getTargetSegmentAddress(getNodeAddress(parent), 
childName, cxt);
+      ICachedMNode child =
+          getPageInstance(getPageIndex(actualSegAddr), cxt)
               .getAsSegmentedPage()
-              .readByAlias(getSegIndex(actualSegAddr), childName);
-      if (child != null) {
-        return child;
-      }
+              .read(getSegIndex(actualSegAddr), childName);
+
+      if (child == null && parent.isDevice()) {
+        // try read alias directly first
+        child =
+            getPageInstance(getPageIndex(actualSegAddr), cxt)
+                .getAsSegmentedPage()
+                .readByAlias(getSegIndex(actualSegAddr), childName);
+        if (child != null) {
+          return child;
+        }
 
-      // try read with sub-index
-      return getChildWithAlias(parent, childName);
+        // try read with sub-index
+        return getChildWithAlias(parent, childName);
+      }
+      return child;
+    } finally {
+      getPageInstance(initIndex, cxt).getLock().readLock().unlock();
+      releaseReferent(cxt);
     }
-    return child;
   }
 
   private ICachedMNode getChildWithAlias(ICachedMNode par, String alias)
       throws IOException, MetadataException {
+    // TODO unnecessary context var
+    SchemaPageContext cxt = new SchemaPageContext();
+
     long srtAddr = getNodeAddress(par);
-    ISchemaPage page = getPageInstance(getPageIndex(srtAddr));
+    ISchemaPage page = getPageInstance(getPageIndex(srtAddr), cxt);
 
     if (page.getAsInternalPage() == null || page.getSubIndex() < 0) {
       return null;
     }
 
-    String name = searchSubIndexAlias(page.getSubIndex(), alias);
+    String name = searchSubIndexAlias(page.getSubIndex(), alias, cxt);
 
     if (name == null) {
       return null;
     }
 
-    return getTargetLeafPage(page, name).getAsSegmentedPage().read((short) 0, 
name);
+    return getTargetLeafPage(page, name, 
cxt).getAsSegmentedPage().read((short) 0, name);
   }
 
   @Override
   public Iterator<ICachedMNode> getChildren(ICachedMNode parent)
       throws MetadataException, IOException {
+    SchemaPageContext cxt = new SchemaPageContext();
     int pageIdx = getPageIndex(getNodeAddress(parent));
+
     short segId = getSegIndex(getNodeAddress(parent));
-    ISchemaPage page = getPageInstance(pageIdx);
+    ISchemaPage page = getPageInstance(pageIdx, cxt);
+    page.getLock().readLock().lock();
 
-    while (page.getAsSegmentedPage() == null) {
-      page = 
getPageInstance(getPageIndex(page.getAsInternalPage().getNextSegAddress()));
-    }
+    try {
+      while (page.getAsSegmentedPage() == null) {
+        page = 
getPageInstance(getPageIndex(page.getAsInternalPage().getNextSegAddress()), 
cxt);
+      }
 
-    long actualSegAddr = page.getAsSegmentedPage().getNextSegAddress(segId);
-    Queue<ICachedMNode> initChildren = 
page.getAsSegmentedPage().getChildren(segId);
-    return new Iterator<ICachedMNode>() {
-      long nextSeg = actualSegAddr;
-      Queue<ICachedMNode> children = initChildren;
+      long actualSegAddr = page.getAsSegmentedPage().getNextSegAddress(segId);
+      Queue<ICachedMNode> initChildren = 
page.getAsSegmentedPage().getChildren(segId);
 
-      @Override
-      public boolean hasNext() {
-        if (!children.isEmpty()) {
-          return true;
-        }
-        if (nextSeg < 0) {
-          return false;
-        }
+      return new Iterator<ICachedMNode>() {
+        long nextSeg = actualSegAddr;
+        Queue<ICachedMNode> children = initChildren;
 
-        try {
-          ISchemaPage nPage;
-          while (children.isEmpty() && nextSeg >= 0) {
-            nPage = getPageInstance(getPageIndex(nextSeg));
-            children = 
nPage.getAsSegmentedPage().getChildren(getSegIndex(nextSeg));
-            nextSeg = 
nPage.getAsSegmentedPage().getNextSegAddress(getSegIndex(nextSeg));
+        @Override
+        public boolean hasNext() {
+          if (!children.isEmpty()) {
+            return true;
+          }
+          if (nextSeg < 0) {
+            return false;
           }
-        } catch (MetadataException | IOException e) {
-          logger.error(e.getMessage());
-          return false;
-        }
 
-        return !children.isEmpty();
-      }
+          try {
+            ISchemaPage nPage;
+            while (children.isEmpty() && nextSeg >= 0) {
+              nPage = getPageInstance(getPageIndex(nextSeg), cxt);
+              children = 
nPage.getAsSegmentedPage().getChildren(getSegIndex(nextSeg));
+              nextSeg = 
nPage.getAsSegmentedPage().getNextSegAddress(getSegIndex(nextSeg));
+              // children iteration need not pin page, consistency is 
guaranteed by upper layer
+              nPage.getRefCnt().decrementAndGet();
+            }
+          } catch (MetadataException | IOException e) {
+            logger.error(e.getMessage());
+            return false;
+          }
 
-      @Override
-      public ICachedMNode next() {
-        return children.poll();
-      }
-    };
+          return !children.isEmpty();
+        }
+
+        @Override
+        public ICachedMNode next() {
+          return children.poll();
+        }
+      };
+    } finally {
+      // safety of iterator should be guaranteed by upper layer
+      getPageInstance(pageIdx, cxt).getLock().readLock().unlock();
+      releaseReferent(cxt);
+    }
   }
 
   /** Seek non-InternalPage by name, syntax sugar of {@linkplain 
#getTargetSegmentAddress}. */
-  private ISchemaPage getTargetLeafPage(ISchemaPage topPage, String recKey)
+  private ISchemaPage getTargetLeafPage(ISchemaPage topPage, String recKey, 
SchemaPageContext cxt)
       throws IOException, MetadataException {
-    treeTrace[0] = 0;
+    cxt.treeTrace[0] = 0;
     if (topPage.getAsInternalPage() == null) {
       return topPage;
     }
@@ -464,10 +523,10 @@ public class BTreePageManager extends PageManager {
     int i = 0; // mark the trace of b+ tree node
     while (curPage.getAsInternalPage() != null) {
       i++;
-      treeTrace[i] = curPage.getPageIndex();
-      curPage = 
getPageInstance(curPage.getAsInternalPage().getRecordByKey(recKey));
+      cxt.treeTrace[i] = curPage.getPageIndex();
+      curPage = 
getPageInstance(curPage.getAsInternalPage().getRecordByKey(recKey), cxt);
     }
-    treeTrace[0] = i; // bound in no.0 elem, points the parent the return
+    cxt.treeTrace[0] = i; // bound in no.0 elem, points the parent the return
 
     return curPage;
   }
@@ -480,10 +539,10 @@ public class BTreePageManager extends PageManager {
    * @return address of the target segment.
    */
   @Override
-  protected long getTargetSegmentAddress(long curSegAddr, String recKey)
+  protected long getTargetSegmentAddress(long curSegAddr, String recKey, 
SchemaPageContext cxt)
       throws IOException, MetadataException {
-    treeTrace[0] = 0;
-    ISchemaPage curPage = getPageInstance(getPageIndex(curSegAddr));
+    cxt.treeTrace[0] = 0;
+    ISchemaPage curPage = getPageInstance(getPageIndex(curSegAddr), cxt);
     if (curPage.getAsSegmentedPage() != null) {
       return curSegAddr;
     }
@@ -491,10 +550,10 @@ public class BTreePageManager extends PageManager {
     int i = 0; // mark the trace of b+ tree node
     while (curPage.getAsInternalPage() != null) {
       i++;
-      treeTrace[i] = curPage.getPageIndex();
-      curPage = 
getPageInstance(curPage.getAsInternalPage().getRecordByKey(recKey));
+      cxt.treeTrace[i] = curPage.getPageIndex();
+      curPage = 
getPageInstance(curPage.getAsInternalPage().getRecordByKey(recKey), cxt);
     }
-    treeTrace[0] = i; // bound in no.0 elem, points the parent the return
+    cxt.treeTrace[0] = i; // bound in no.0 elem, points the parent the return
 
     return getGlobalIndex(curPage.getPageIndex(), (short) 0);
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/pagemgr/IPageManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/pagemgr/IPageManager.java
index 462c348b7fc..d1014550407 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/pagemgr/IPageManager.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/pagemgr/IPageManager.java
@@ -38,9 +38,10 @@ import java.util.Iterator;
  */
 public interface IPageManager {
 
-  void writeNewChildren(ICachedMNode parNode) throws MetadataException, 
IOException;
-
-  void writeUpdatedChildren(ICachedMNode parNode) throws MetadataException, 
IOException;
+  /**
+   * All change will be an internal process, lock and dirty pages are now in 
the scope of context.
+   */
+  void writeMNode(ICachedMNode node) throws MetadataException, IOException;
 
   void delete(ICachedMNode node) throws IOException, MetadataException;
 
@@ -51,8 +52,6 @@ public interface IPageManager {
 
   void clear() throws IOException, MetadataException;
 
-  void flushDirtyPages() throws IOException;
-
   void close() throws IOException;
 
   int getLastPageIndex();
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/pagemgr/PageManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/pagemgr/PageManager.java
index eb36b6d2be7..015bd38fb06 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/pagemgr/PageManager.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/pagemgr/PageManager.java
@@ -44,19 +44,28 @@ import java.io.PrintWriter;
 import java.nio.ByteBuffer;
 import java.nio.channels.FileChannel;
 import java.nio.file.StandardOpenOption;
-import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
 import java.util.LinkedHashMap;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
+import java.util.Queue;
+import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 import java.util.stream.Collectors;
 
+import static 
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.schemafile.SchemaFile.getNodeAddress;
+import static 
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.schemafile.SchemaFile.getPageIndex;
+
 /**
  * Abstraction for various implementation of structure of pages. But 
multi-level index is hard-coded
  * within the framework since only alias works like secondary index now.
@@ -67,21 +76,16 @@ public abstract class PageManager implements IPageManager {
   protected static final Logger logger = 
LoggerFactory.getLogger(PageManager.class);
 
   protected final Map<Integer, ISchemaPage> pageInstCache;
-  protected final Map<Integer, ISchemaPage> dirtyPages;
+  // bucket for quick retrieval, only append when write operation finished
+  protected final PageIndexSortBuckets pageIndexBuckets;
 
-  // optimize retrieval of the smallest applicable DIRTY segmented page
-  // tiered by: MIN_SEG_SIZE, PAGE/16, PAGE/8, PAGE/4, PAGE/2, PAGE_SIZE
-  protected final LinkedList<Integer>[] tieredDirtyPageIndex =
-      new LinkedList[SchemaFileConfig.SEG_SIZE_LST.length];
+  protected final Lock cacheLock;
+  protected final Condition cacheFull;
 
-  protected final ReentrantLock evictLock;
-  protected final PageLocks pageLocks;
+  private final Map<Long, SchemaPageContext> threadContexts;
 
   protected final AtomicInteger lastPageIndex;
 
-  // shift to ThreadLocal if concurrent write expected
-  protected int[] treeTrace;
-
   private final FileChannel channel;
 
   // handle timeout interruption during reading
@@ -93,21 +97,20 @@ public abstract class PageManager implements IPageManager {
 
   // flush strategy is dependent on consensus protocol, only check protocol on 
init
   protected FlushPageStrategy flushDirtyPagesStrategy;
+  protected SinglePageFlushStrategy singlePageFlushStrategy;
 
   PageManager(FileChannel channel, File pmtFile, int lastPageIndex, String 
logPath)
       throws IOException, MetadataException {
     this.pageInstCache =
         Collections.synchronizedMap(new 
LinkedHashMap<>(SchemaFileConfig.PAGE_CACHE_SIZE, 1, true));
-    this.dirtyPages = new ConcurrentHashMap<>();
-    for (int i = 0; i < tieredDirtyPageIndex.length; i++) {
-      tieredDirtyPageIndex[i] = new LinkedList<>();
-    }
+    this.pageIndexBuckets = new 
PageIndexSortBuckets(SchemaFileConfig.SEG_SIZE_LST, pageInstCache);
+    this.threadContexts = new ConcurrentHashMap<>();
+
+    this.cacheLock = new ReentrantLock();
+    this.cacheFull = this.cacheLock.newCondition();
 
-    this.evictLock = new ReentrantLock();
-    this.pageLocks = new PageLocks();
     this.lastPageIndex =
         lastPageIndex >= 0 ? new AtomicInteger(lastPageIndex) : new 
AtomicInteger(0);
-    this.treeTrace = new int[16];
     this.channel = channel;
     this.pmtFile = pmtFile;
     this.readChannel = FileChannel.open(pmtFile.toPath(), 
StandardOpenOption.READ);
@@ -120,12 +123,14 @@ public abstract class PageManager implements IPageManager 
{
       logCounter = new AtomicInteger();
       logWriter = null;
       flushDirtyPagesStrategy = this::flushDirtyPagesWithoutLogging;
+      singlePageFlushStrategy = this::flushSinglePageWithoutLogging;
     } else {
       // without RATIS, utilize physical logging for integrity
       int pageAcc = (int) recoverFromLog(logPath) / 
SchemaFileConfig.PAGE_LENGTH;
       this.logWriter = new SchemaFileLogWriter(logPath);
       logCounter = new AtomicInteger(pageAcc);
       flushDirtyPagesStrategy = this::flushDirtyPagesWithLogging;
+      singlePageFlushStrategy = this::flushSinglePageWithLogging;
     }
 
     // construct first page if file to init
@@ -134,7 +139,8 @@ public abstract class PageManager implements IPageManager {
           
ISchemaPage.initSegmentedPage(ByteBuffer.allocate(SchemaFileConfig.PAGE_LENGTH),
 0);
       rootPage.allocNewSegment(SchemaFileConfig.SEG_MAX_SIZ);
       pageInstCache.put(rootPage.getPageIndex(), rootPage);
-      markDirty(rootPage);
+      rootPage.syncPageBuffer();
+      rootPage.flushPageToChannel(channel);
     }
   }
 
@@ -161,11 +167,148 @@ public abstract class PageManager implements 
IPageManager {
     return 0L;
   }
 
-  // region Framework Methods
+  /**
+   * A rough cache size guardian, all threads passed this entrant check will 
not be limited with
+   * cache size anymore. TODO A better guardian is based on constraint per 
thread.
+   */
+  protected void cacheGuardian() {
+    cacheLock.lock();
+    try {
+      while (pageInstCache.size() > SchemaFileConfig.PAGE_CACHE_SIZE) {
+        try {
+          // try to evict by LRU
+          Iterator<ISchemaPage> iterator = pageInstCache.values().iterator();
+          int pageSizeLimit = SchemaFileConfig.PAGE_CACHE_SIZE, size = 
pageInstCache.size();
+
+          ISchemaPage p;
+          while (iterator.hasNext()) {
+            p = iterator.next();
+
+            if (size <= pageSizeLimit) {
+              break;
+            }
+
+            if (p.getRefCnt().get() == 0) {
+              iterator.remove();
+              size--;
+            }
+          }
+
+          if (pageInstCache.size() > SchemaFileConfig.PAGE_CACHE_SIZE) {
+            // wait until another operation finished and released pages
+            cacheFull.await();
+          }
+        } catch (InterruptedException e) {
+          logger.warn(
+              "Interrupted during page cache eviction. Consider increasing 
cache size, "
+                  + "reducing concurrency, or extending timeout");
+        }
+      }
+    } finally {
+      cacheLock.unlock();
+    }
+  }
+
   @Override
-  public void writeNewChildren(ICachedMNode node) throws MetadataException, 
IOException {
+  public void writeMNode(ICachedMNode node) throws MetadataException, 
IOException {
+    SchemaPageContext cxt = new SchemaPageContext();
+    threadContexts.put(Thread.currentThread().getId(), cxt);
+    cacheGuardian();
+    entrantLock(node, cxt);
+    try {
+      writeNewChildren(node, cxt);
+      writeUpdatedChildren(node, cxt);
+      flushDirtyPages(cxt);
+    } finally {
+      releaseLocks(cxt);
+      releaseReferent(cxt);
+      threadContexts.remove(Thread.currentThread().getId(), cxt);
+    }
+  }
+
+  /** Context only tracks write locks, and so it shall be released. */
+  protected void releaseLocks(SchemaPageContext cxt) throws IOException, 
MetadataException {
+    for (int i : cxt.lockTraces) {
+      pageInstCache.get(i).getLock().writeLock().unlock();
+    }
+  }
+
+  /** release referents and evict likely useless page if necessary */
+  protected void releaseReferent(SchemaPageContext cxt) throws IOException, 
MetadataException {
+    for (ISchemaPage p : cxt.referredPages.values()) {
+      p.getRefCnt().decrementAndGet();
+    }
+
+    if (pageInstCache.size() > SchemaFileConfig.PAGE_CACHE_SIZE) {
+      cacheLock.lock();
+      try {
+        for (ISchemaPage p : cxt.referredPages.values()) {
+          if (p.getRefCnt().get() == 0) {
+            pageInstCache.remove(p.getPageIndex());
+          }
+        }
+
+        if (pageInstCache.size() <= SchemaFileConfig.PAGE_CACHE_SIZE) {
+          cacheFull.signal();
+        }
+      } finally {
+        cacheLock.unlock();
+      }
+    }
+  }
+
+  /** locking in the order of page index to avoid deadlock. */
+  protected void entrantLock(ICachedMNode node, SchemaPageContext cxt)
+      throws IOException, MetadataException {
+    int initPageIndex = getPageIndex(getNodeAddress(node));
+    ISchemaPage page;
+
+    if (node.isDatabase()) {
+      if (initPageIndex < 0) {
+        // node is using template
+        return;
+      }
+
+      page = getPageInstance(initPageIndex, cxt);
+      page.getLock().writeLock().lock();
+      cxt.traceLock(page);
+      cxt.indexBuckets.sortIntoBucket(page, (short) -1);
+    } else {
+      int parIndex = getPageIndex(getNodeAddress(node.getParent()));
+
+      int minPageIndex = Math.min(initPageIndex, parIndex);
+      int maxPageIndex = Math.max(initPageIndex, parIndex);
+
+      // as InternalPage will not transplant, its parent pointer needs no lock
+      page = getPageInstance(initPageIndex, cxt);
+      if (page.getAsInternalPage() != null) {
+        page.getLock().writeLock().lock();
+        cxt.traceLock(page);
+        cxt.indexBuckets.sortIntoBucket(page, (short) -1);
+        return;
+      }
+
+      if (minPageIndex > 0) {
+        page = getPageInstance(minPageIndex, cxt);
+        page.getLock().writeLock().lock();
+        cxt.traceLock(page);
+        cxt.indexBuckets.sortIntoBucket(page, (short) -1);
+      }
+
+      if (minPageIndex != maxPageIndex && maxPageIndex > 0) {
+        page = getPageInstance(maxPageIndex, cxt);
+        page.getLock().writeLock().lock();
+        cxt.traceLock(page);
+        cxt.indexBuckets.sortIntoBucket(page, (short) -1);
+      }
+    }
+  }
+
+  // region Framework Methods
+  private void writeNewChildren(ICachedMNode node, SchemaPageContext cxt)
+      throws MetadataException, IOException {
     int subIndex;
-    long curSegAddr = SchemaFile.getNodeAddress(node);
+    long curSegAddr = getNodeAddress(node);
     long actualAddress; // actual segment to write record
     ICachedMNode child;
     ISchemaPage curPage;
@@ -181,7 +324,7 @@ public abstract class PageManager implements IPageManager {
       if (!child.isMeasurement()) {
         alias = null;
 
-        if (SchemaFile.getNodeAddress(child) >= 0) {
+        if (getNodeAddress(child) >= 0) {
           // new child with a valid segment address, weird
           throw new MetadataException(
               String.format(
@@ -192,7 +335,7 @@ public abstract class PageManager implements IPageManager {
         // pre-allocate except that child is a device node using template
         if (!(child.isDevice() && child.getAsDeviceMNode().isUseTemplate())) {
           short estSegSize = estimateSegmentSize(child);
-          long glbIndex = preAllocateSegment(estSegSize);
+          long glbIndex = preAllocateSegment(estSegSize, cxt);
           SchemaFile.setNodeAddress(child, glbIndex);
         }
       } else {
@@ -204,34 +347,34 @@ public abstract class PageManager implements IPageManager 
{
 
       // prepare buffer to write
       childBuffer = RecordUtils.node2Buffer(child);
-      actualAddress = getTargetSegmentAddress(curSegAddr, entry.getKey());
-      curPage = getPageInstance(SchemaFile.getPageIndex(actualAddress));
+      actualAddress = getTargetSegmentAddress(curSegAddr, entry.getKey(), cxt);
+      curPage = getPageInstance(SchemaFile.getPageIndex(actualAddress), cxt);
 
       try {
         curPage
             .getAsSegmentedPage()
             .write(SchemaFile.getSegIndex(actualAddress), entry.getKey(), 
childBuffer);
-        markDirty(curPage);
-        addPageToCache(curPage.getPageIndex(), curPage);
+        interleavedFlush(curPage, cxt);
+        cxt.markDirty(curPage);
 
-        subIndex = subIndexRootPage(curSegAddr);
+        subIndex = subIndexRootPage(curSegAddr, cxt);
         if (alias != null && subIndex >= 0) {
-          insertSubIndexEntry(subIndex, alias, entry.getKey());
+          insertSubIndexEntry(subIndex, alias, entry.getKey(), cxt);
         }
 
       } catch (SchemaPageOverflowException e) {
         if 
(curPage.getAsSegmentedPage().getSegmentSize(SchemaFile.getSegIndex(actualAddress))
             == SchemaFileConfig.SEG_MAX_SIZ) {
           // curPage might be replaced so unnecessary to mark it here
-          multiPageInsertOverflowOperation(curPage, entry.getKey(), 
childBuffer);
+          multiPageInsertOverflowOperation(curPage, entry.getKey(), 
childBuffer, cxt);
 
-          subIndex = subIndexRootPage(curSegAddr);
+          subIndex = subIndexRootPage(curSegAddr, cxt);
           if (node.isDevice() && subIndex < 0) {
             // the record occurred overflow had been inserted already
-            buildSubIndex(node);
+            buildSubIndex(node, cxt);
           } else if (alias != null) {
             // implied node is entity, so sub index must exist
-            insertSubIndexEntry(subIndex, alias, entry.getKey());
+            insertSubIndexEntry(subIndex, alias, entry.getKey(), cxt);
           }
         } else {
           // transplant and delete former segment
@@ -243,27 +386,30 @@ public abstract class PageManager implements IPageManager 
{
                       .getNewChildBuffer()
                       .entrySet()
                       .size());
-          ISegmentedPage newPage = getMinApplSegmentedPageInMem(newSegSize);
+          ISegmentedPage newPage = getMinApplSegmentedPageInMem(newSegSize, 
cxt);
 
           // with single segment, curSegAddr equals actualAddress
           curSegAddr =
               newPage.transplantSegment(curPage.getAsSegmentedPage(), 
actSegId, newSegSize);
           newPage.write(SchemaFile.getSegIndex(curSegAddr), entry.getKey(), 
childBuffer);
           curPage.getAsSegmentedPage().deleteSegment(actSegId);
+
+          // entrant lock guarantees thread-safe
           SchemaFile.setNodeAddress(node, curSegAddr);
-          updateParentalRecord(node.getParent(), node.getName(), curSegAddr);
-          markDirty(curPage);
-          addPageToCache(curPage.getPageIndex(), curPage);
+          updateParentalRecord(node.getParent(), node.getName(), curSegAddr, 
cxt);
+
+          // newPage is marked and referred within getMinAppl method
+          cxt.markDirty(curPage);
         }
       }
     }
   }
 
-  @Override
-  public void writeUpdatedChildren(ICachedMNode node) throws 
MetadataException, IOException {
+  private void writeUpdatedChildren(ICachedMNode node, SchemaPageContext cxt)
+      throws MetadataException, IOException {
     boolean removeOldSubEntry = false, insertNewSubEntry = false;
     int subIndex;
-    long curSegAddr = SchemaFile.getNodeAddress(node);
+    long curSegAddr = getNodeAddress(node);
     long actualAddress; // actual segment to write record
     String alias, oldAlias; // key of the sub-index entry now
     ICachedMNode child, oldChild;
@@ -272,10 +418,10 @@ public abstract class PageManager implements IPageManager 
{
     for (Map.Entry<String, ICachedMNode> entry :
         
ICachedMNodeContainer.getCachedMNodeContainer(node).getUpdatedChildBuffer().entrySet())
 {
       child = entry.getValue();
-      actualAddress = getTargetSegmentAddress(curSegAddr, entry.getKey());
+      actualAddress = getTargetSegmentAddress(curSegAddr, entry.getKey(), cxt);
       childBuffer = RecordUtils.node2Buffer(child);
 
-      curPage = getPageInstance(SchemaFile.getPageIndex(actualAddress));
+      curPage = getPageInstance(SchemaFile.getPageIndex(actualAddress), cxt);
       if 
(curPage.getAsSegmentedPage().read(SchemaFile.getSegIndex(actualAddress), 
entry.getKey())
           == null) {
         throw new MetadataException(
@@ -319,34 +465,34 @@ public abstract class PageManager implements IPageManager 
{
         curPage
             .getAsSegmentedPage()
             .update(SchemaFile.getSegIndex(actualAddress), entry.getKey(), 
childBuffer);
-        markDirty(curPage);
-        addPageToCache(curPage.getPageIndex(), curPage);
+        interleavedFlush(curPage, cxt);
+        cxt.markDirty(curPage);
 
-        subIndex = subIndexRootPage(curSegAddr);
+        subIndex = subIndexRootPage(curSegAddr, cxt);
         if (subIndex >= 0) {
           if (removeOldSubEntry) {
-            removeSubIndexEntry(subIndex, oldAlias);
+            removeSubIndexEntry(subIndex, oldAlias, cxt);
           }
 
           if (insertNewSubEntry) {
-            insertSubIndexEntry(subIndex, alias, entry.getKey());
+            insertSubIndexEntry(subIndex, alias, entry.getKey(), cxt);
           }
         }
       } catch (SchemaPageOverflowException e) {
         if 
(curPage.getAsSegmentedPage().getSegmentSize(SchemaFile.getSegIndex(actualAddress))
             == SchemaFileConfig.SEG_MAX_SIZ) {
-          multiPageUpdateOverflowOperation(curPage, entry.getKey(), 
childBuffer);
+          multiPageUpdateOverflowOperation(curPage, entry.getKey(), 
childBuffer, cxt);
 
-          subIndex = subIndexRootPage(curSegAddr);
+          subIndex = subIndexRootPage(curSegAddr, cxt);
           if (node.isDevice() && subIndex < 0) {
-            buildSubIndex(node);
+            buildSubIndex(node, cxt);
           } else if (insertNewSubEntry || removeOldSubEntry) {
             if (removeOldSubEntry) {
-              removeSubIndexEntry(subIndex, oldAlias);
+              removeSubIndexEntry(subIndex, oldAlias, cxt);
             }
 
             if (insertNewSubEntry) {
-              insertSubIndexEntry(subIndex, alias, entry.getKey());
+              insertSubIndexEntry(subIndex, alias, entry.getKey(), cxt);
             }
           }
         } else {
@@ -355,7 +501,7 @@ public abstract class PageManager implements IPageManager {
           short newSegSiz =
               reEstimateSegSize(
                   curPage.getAsSegmentedPage().getSegmentSize(actSegId) + 
childBuffer.capacity());
-          ISegmentedPage newPage = getMinApplSegmentedPageInMem(newSegSiz);
+          ISegmentedPage newPage = getMinApplSegmentedPageInMem(newSegSiz, 
cxt);
 
           // assign new segment address
           curSegAddr = newPage.transplantSegment(curPage.getAsSegmentedPage(), 
actSegId, newSegSiz);
@@ -363,9 +509,8 @@ public abstract class PageManager implements IPageManager {
 
           newPage.update(SchemaFile.getSegIndex(curSegAddr), entry.getKey(), 
childBuffer);
           SchemaFile.setNodeAddress(node, curSegAddr);
-          updateParentalRecord(node.getParent(), node.getName(), curSegAddr);
-          markDirty(curPage);
-          addPageToCache(curPage.getPageIndex(), curPage);
+          updateParentalRecord(node.getParent(), node.getName(), curSegAddr, 
cxt);
+          cxt.markDirty(curPage);
         }
       }
     }
@@ -374,8 +519,8 @@ public abstract class PageManager implements IPageManager {
   // endregion
 
   // region Abstract and Overridable Methods
-  protected abstract long getTargetSegmentAddress(long curAddr, String recKey)
-      throws IOException, MetadataException;
+  protected abstract long getTargetSegmentAddress(
+      long curAddr, String recKey, SchemaPageContext cxt) throws IOException, 
MetadataException;
 
   /**
    * Deal with split, transplant and index update about the overflow occurred 
by insert.
@@ -385,7 +530,7 @@ public abstract class PageManager implements IPageManager {
    * @param childBuffer content of the key
    */
   protected abstract void multiPageInsertOverflowOperation(
-      ISchemaPage curPage, String key, ByteBuffer childBuffer)
+      ISchemaPage curPage, String key, ByteBuffer childBuffer, 
SchemaPageContext cxt)
       throws MetadataException, IOException;
 
   /**
@@ -394,7 +539,7 @@ public abstract class PageManager implements IPageManager {
    * @return the top internal page
    */
   protected abstract void multiPageUpdateOverflowOperation(
-      ISchemaPage curPage, String key, ByteBuffer childBuffer)
+      ISchemaPage curPage, String key, ByteBuffer childBuffer, 
SchemaPageContext cxt)
       throws MetadataException, IOException;
 
   /**
@@ -402,7 +547,8 @@ public abstract class PageManager implements IPageManager {
    *
    * @param parNode node needs to build subordinate index.
    */
-  protected abstract void buildSubIndex(ICachedMNode parNode) throws 
MetadataException, IOException;
+  protected abstract void buildSubIndex(ICachedMNode parNode, 
SchemaPageContext cxt)
+      throws MetadataException, IOException;
 
   /**
    * Insert an entry of subordinate index of the target node.
@@ -411,68 +557,127 @@ public abstract class PageManager implements 
IPageManager {
    * @param key key of the sub-index entry.
    * @param rec value of the sub-index entry.
    */
-  protected abstract void insertSubIndexEntry(int base, String key, String rec)
+  protected abstract void insertSubIndexEntry(
+      int base, String key, String rec, SchemaPageContext cxt)
       throws MetadataException, IOException;
 
-  protected abstract void removeSubIndexEntry(int base, String oldAlias)
+  protected abstract void removeSubIndexEntry(int base, String oldAlias, 
SchemaPageContext cxt)
       throws MetadataException, IOException;
 
-  protected abstract String searchSubIndexAlias(int base, String alias)
+  protected abstract String searchSubIndexAlias(int base, String alias, 
SchemaPageContext cxt)
       throws MetadataException, IOException;
 
   // endregion
 
-  // region General Interfaces
-  @Override
-  public int getLastPageIndex() {
-    return lastPageIndex.get();
+  // region Flush Strategy
+  @FunctionalInterface
+  interface FlushPageStrategy {
+    void apply(List<ISchemaPage> dirtyPages) throws IOException;
   }
 
   @FunctionalInterface
-  interface FlushPageStrategy {
-    void apply() throws IOException;
+  interface SinglePageFlushStrategy {
+    void apply(ISchemaPage page) throws IOException;
   }
 
-  private void flushDirtyPagesWithLogging() throws IOException {
+  private void flushDirtyPagesWithLogging(List<ISchemaPage> dirtyPages) throws 
IOException {
+    if (dirtyPages.size() == 0) {
+      return;
+    }
+
     if (logCounter.get() > SchemaFileConfig.SCHEMA_FILE_LOG_SIZE) {
       logWriter = logWriter.renew();
       logCounter.set(0);
     }
 
     logCounter.addAndGet(dirtyPages.size());
-    for (ISchemaPage page : dirtyPages.values()) {
+    for (ISchemaPage page : dirtyPages) {
       page.syncPageBuffer();
       logWriter.write(page);
     }
     logWriter.prepare();
 
-    for (ISchemaPage page : dirtyPages.values()) {
+    for (ISchemaPage page : dirtyPages) {
       page.flushPageToChannel(channel);
     }
     logWriter.commit();
   }
 
-  private void flushDirtyPagesWithoutLogging() throws IOException {
-    for (ISchemaPage page : dirtyPages.values()) {
+  private void flushSinglePageWithLogging(ISchemaPage page) throws IOException 
{
+    if (logCounter.get() > SchemaFileConfig.SCHEMA_FILE_LOG_SIZE) {
+      logWriter = logWriter.renew();
+      logCounter.set(0);
+    }
+
+    logCounter.addAndGet(1);
+    page.syncPageBuffer();
+    logWriter.write(page);
+    logWriter.prepare();
+    page.flushPageToChannel(channel);
+    logWriter.commit();
+  }
+
+  private void flushDirtyPagesWithoutLogging(List<ISchemaPage> dirtyPages) 
throws IOException {
+    for (ISchemaPage page : dirtyPages) {
       page.syncPageBuffer();
       page.flushPageToChannel(channel);
     }
   }
 
-  @Override
-  public void flushDirtyPages() throws IOException {
-    if (dirtyPages.size() == 0) {
+  private void flushSinglePageWithoutLogging(ISchemaPage page) throws 
IOException {
+    page.syncPageBuffer();
+    page.flushPageToChannel(channel);
+  }
+
+  public synchronized void flushDirtyPages(SchemaPageContext cxt) throws 
IOException {
+    if (cxt.dirtyCnt == 0) {
       return;
     }
-    flushDirtyPagesStrategy.apply();
-    dirtyPages.clear();
-    Arrays.stream(tieredDirtyPageIndex).forEach(LinkedList::clear);
+    flushDirtyPagesStrategy.apply(
+        cxt.referredPages.values().stream()
+            .filter(ISchemaPage::isDirtyPage)
+            .collect(Collectors.toList()));
+    cxt.appendBucketIndex(pageIndexBuckets);
+  }
+
+  /**
+   * when page prepares to insert, compare it with lastLeaf from context. if 
not same, then flush
+   * the lastLeaf, unlock, deref, and remove it from dirtyPages. lastLeafPage 
only initiated at
+   * overflowOperation
+   */
+  private void interleavedFlush(ISchemaPage page, SchemaPageContext cxt) 
throws IOException {
+    if (cxt.lastLeafPage == null || cxt.lastLeafPage.getPageIndex() == 
page.getPageIndex()) {
+      return;
+    }
+    cxt.interleavedFlushCnt++;
+    singlePageFlushStrategy.apply(cxt.lastLeafPage);
+    // this lastLeaf shall only be lock once
+    cxt.dirtyCnt--;
+
+    // unlock and deref the page from context
+    if (cxt.lockTraces.contains(cxt.lastLeafPage.getPageIndex())) {
+      cxt.lastLeafPage.getLock().writeLock().unlock();
+      cxt.lockTraces.remove(cxt.lastLeafPage.getPageIndex());
+    }
+    cxt.lastLeafPage.getRefCnt().decrementAndGet();
+
+    // can be reclaimed since the page only referred by pageInstCache
+    cxt.referredPages.remove(cxt.lastLeafPage.getPageIndex());
+    // alleviate eviction pressure
+    pageInstCache.remove(cxt.lastLeafPage);
+
+    cxt.lastLeafPage = page.getAsSegmentedPage();
+  }
+  // endregion
+
+  // region General Interfaces
+  @Override
+  public int getLastPageIndex() {
+    return lastPageIndex.get();
   }
 
   @Override
   public void clear() throws IOException, MetadataException {
-    dirtyPages.clear();
-    Arrays.stream(tieredDirtyPageIndex).forEach(LinkedList::clear);
     pageInstCache.clear();
     lastPageIndex.set(0);
     logWriter = logWriter == null ? null : logWriter.renew();
@@ -480,9 +685,10 @@ public abstract class PageManager implements IPageManager {
 
   @Override
   public void inspect(PrintWriter pw) throws IOException, MetadataException {
+    SchemaPageContext cxt = new SchemaPageContext();
     String pageContent;
     for (int i = 0; i <= lastPageIndex.get(); i++) {
-      pageContent = getPageInstance(i).inspect();
+      pageContent = getPageInstance(i, cxt).inspect();
       pw.print("---------------------\n");
       pw.print(pageContent);
       pw.print("\n");
@@ -500,182 +706,113 @@ public abstract class PageManager implements 
IPageManager {
 
   // region Page Access Management
 
-  public ISchemaPage getPageInstance(int pageIdx) throws IOException, 
MetadataException {
+  /** Any page returned will be pinned/referred by the cxt. * */
+  public ISchemaPage getPageInstance(int pageIdx, SchemaPageContext cxt)
+      throws IOException, MetadataException {
     if (pageIdx > lastPageIndex.get()) {
       throw new MetadataException(String.format("Page index %d out of range.", 
pageIdx));
     }
 
-    pageLocks.readLock(pageIdx);
-    try {
-      if (dirtyPages.containsKey(pageIdx)) {
-        return dirtyPages.get(pageIdx);
-      }
-
-      if (pageInstCache.containsKey(pageIdx)) {
-        return pageInstCache.get(pageIdx);
-      }
-    } finally {
-      pageLocks.readUnlock(pageIdx);
+    // just return from (thread local) context
+    if (cxt != null && cxt.referredPages.containsKey(pageIdx)) {
+      return cxt.referredPages.get(pageIdx);
     }
 
+    // lock for no duplicate page with same index from disk, and guarantees 
page will not be evicted
+    //  by other thread before referred by current thread
+    cacheLock.lock();
     try {
-      pageLocks.writeLock(pageIdx);
+      ISchemaPage page = pageInstCache.get(pageIdx);
+      if (page != null) {
+        cxt.refer(page);
+        return page;
+      }
 
       ByteBuffer newBuf = ByteBuffer.allocate(SchemaFileConfig.PAGE_LENGTH);
-
       loadFromFile(newBuf, pageIdx);
-      return addPageToCache(pageIdx, ISchemaPage.loadSchemaPage(newBuf));
+      page = ISchemaPage.loadSchemaPage(newBuf);
+      cxt.refer(page);
+      addPageToCache(page);
+      return page;
     } finally {
-      pageLocks.writeUnlock(pageIdx);
+      cacheLock.unlock();
     }
   }
 
-  private long preAllocateSegment(short size) throws IOException, 
MetadataException {
-    ISegmentedPage page = getMinApplSegmentedPageInMem(size);
+  private long preAllocateSegment(short size, SchemaPageContext cxt)
+      throws IOException, MetadataException {
+    ISegmentedPage page = getMinApplSegmentedPageInMem(size, cxt);
     return SchemaFile.getGlobalIndex(page.getPageIndex(), 
page.allocNewSegment(size));
   }
 
-  protected ISchemaPage replacePageInCache(ISchemaPage page) {
-    markDirty(page);
-    return addPageToCache(page.getPageIndex(), page);
+  protected ISchemaPage replacePageInCache(ISchemaPage page, SchemaPageContext 
cxt) {
+    // no need to lock since the root of B+Tree is locked
+    cxt.markDirty(page, true);
+    addPageToCache(page);
+    return page;
   }
 
   /**
-   * Accessing dirtyPages will be expedited, while the retrieval from 
pageInstCache remains
-   * unaffected due to its limited capacity.
+   * Only accessed during write operation and every page returned is due to be 
modified thus they
+   * are all marked dirty.
    *
    * @param size size of the expected segment
+   * @return always write locked
    */
-  protected ISegmentedPage getMinApplSegmentedPageInMem(short size) {
-    ISchemaPage targetPage = null;
-    int tierLoopCnt = 0;
-    for (int i = 0; i < tieredDirtyPageIndex.length && dirtyPages.size() > 0; 
i++) {
-      tierLoopCnt = tieredDirtyPageIndex[i].size();
-      while (size < SchemaFileConfig.SEG_SIZE_LST[i] && tierLoopCnt > 0) {
-        targetPage = dirtyPages.get(tieredDirtyPageIndex[i].pop());
-        tierLoopCnt--;
-
-        //  check validity of the retrieved targetPage, as the page type may 
have changed,
-        //   e.g., from SegmentedPage to InternalPage, or index could be stale
-        if (targetPage == null || targetPage.getAsSegmentedPage() == null) {
-          // invalid index for SegmentedPage, drop the index and get next
-          continue;
-        }
-
-        // suitable page for requested size
-        if (targetPage.getAsSegmentedPage().isCapableForSegSize(size)) {
-          sortSegmentedIntoIndex(targetPage, size);
-          return targetPage.getAsSegmentedPage();
-        }
-
-        // not large enough but legal for another retrieval
-        if 
(targetPage.getAsSegmentedPage().isCapableForSegSize(SchemaFileConfig.SEG_SIZE_LST[i]))
 {
-          tieredDirtyPageIndex[i].add(targetPage.getPageIndex());
-        }
-      }
+  protected ISegmentedPage getMinApplSegmentedPageInMem(short size, 
SchemaPageContext cxt) {
+    // pages retrieved from context is unnecessary and inefficient to lock
+    ISchemaPage targetPage = cxt.indexBuckets.getNearestFitPage(size, false);
+    if (targetPage != null) {
+      cxt.indexBuckets.sortIntoBucket(targetPage, size);
+      return targetPage.getAsSegmentedPage();
     }
 
-    // TODO refactor design related to pageInstCache to index its pages in 
further development
-    for (Map.Entry<Integer, ISchemaPage> entry : pageInstCache.entrySet()) {
-      if (entry.getValue().getAsSegmentedPage() != null
-          && entry.getValue().getAsSegmentedPage().isCapableForSegSize(size)) {
-        markDirty(entry.getValue());
-        return pageInstCache.get(entry.getKey()).getAsSegmentedPage();
-      }
-    }
-    return allocateNewSegmentedPage().getAsSegmentedPage();
-  }
+    // pageIndexBuckets sorts pages within pageInstCache into buckets to 
expedite access
+    targetPage = pageIndexBuckets.getNearestFitPage(size, true);
+    if (targetPage != null) {
+      cxt.markDirty(targetPage);
+      cxt.traceLock(targetPage);
 
-  /**
-   * Index SegmentedPage inside {@linkplain #dirtyPages} into tiered list by 
{@linkplain
-   * SchemaFileConfig#SEG_SIZE_LST}.
-   *
-   * <p>The level of its index depends on its AVAILABLE space.
-   *
-   * @param page SegmentedPage to be indexed, no guardian statements since all 
entrances are secured
-   *     for now
-   * @param newSegSize to re-integrate after a retrieval, the expected 
overhead shall be considered.
-   *     -1 for common dirty mark.
-   */
-  protected void sortSegmentedIntoIndex(ISchemaPage page, short newSegSize) {
-    // actual space occupied by a segment includes both its own length and the 
length of its offset.
-    // so available length for a segment is the spareSize minus the offset 
length
-    short availableSize =
-        newSegSize < 0
-            ? (short) (page.getAsSegmentedPage().getSpareSize() - 
SchemaFileConfig.SEG_OFF_DIG)
-            : (short)
-                (page.getAsSegmentedPage().getSpareSize()
-                    - newSegSize
-                    - SchemaFileConfig.SEG_OFF_DIG);
-
-    // too small to index
-    if (availableSize < SchemaFileConfig.SEG_HEADER_SIZE) {
-      return;
+      // transfer the page from pageIndexBuckets to cxt.buckets thus not be 
accessed by other WRITE
+      // thread
+      cxt.indexBuckets.sortIntoBucket(targetPage, size);
+      return targetPage.getAsSegmentedPage();
     }
 
-    // index range like: SEG_HEADER_SIZE <= [0] < SEG_SIZE_LST[0], ...
-    for (int i = 0; i < SchemaFileConfig.SEG_SIZE_LST.length; i++) {
-      // the last of SEG_SIZE_LST is the maximum page size, definitely larger 
than others
-      if (availableSize < SchemaFileConfig.SEG_SIZE_LST[i]) {
-        tieredDirtyPageIndex[i].add(page.getPageIndex());
-        return;
-      }
-    }
+    // due to be dirty thus its index only sorted into local buckets
+    targetPage = allocNewSegmentedPage(cxt);
+    cxt.indexBuckets.sortIntoBucket(targetPage, size);
+    return targetPage.getAsSegmentedPage();
   }
 
-  protected synchronized ISchemaPage allocateNewSegmentedPage() {
-    lastPageIndex.incrementAndGet();
+  protected ISchemaPage allocNewSegmentedPage(SchemaPageContext cxt) {
     ISchemaPage newPage =
         ISchemaPage.initSegmentedPage(
-            ByteBuffer.allocate(SchemaFileConfig.PAGE_LENGTH), 
lastPageIndex.get());
-    markDirty(newPage);
-    return addPageToCache(newPage.getPageIndex(), newPage);
+            ByteBuffer.allocate(SchemaFileConfig.PAGE_LENGTH), 
lastPageIndex.incrementAndGet());
+    // lock and mark to be consistent with # getMinApplSegmentedPageInMem
+    cxt.markDirty(newPage);
+    newPage.getLock().writeLock().lock();
+    cxt.traceLock(newPage);
+    addPageToCache(newPage);
+    return newPage;
   }
 
-  protected synchronized ISchemaPage registerAsNewPage(ISchemaPage page) {
+  protected ISchemaPage registerAsNewPage(ISchemaPage page, SchemaPageContext 
cxt) {
+    // new page will be LOCAL until the creating thread finished
+    //  thus not added to sparsePageIndex now
     page.setPageIndex(lastPageIndex.incrementAndGet());
-    markDirty(page);
-    return addPageToCache(page.getPageIndex(), page);
-  }
-
-  protected void markDirty(ISchemaPage page) {
-    page.markDirty();
-    dirtyPages.put(page.getPageIndex(), page);
-
-    if (page.getAsSegmentedPage() != null) {
-      sortSegmentedIntoIndex(page, (short) -1);
-    }
+    addPageToCache(page);
+    cxt.markDirty(page);
+    return page;
   }
 
-  protected ISchemaPage addPageToCache(int pageIndex, ISchemaPage page) {
-    pageInstCache.put(pageIndex, page);
-    // only one thread evicts and flushes pages
-    if (evictLock.tryLock()) {
-      try {
-        if (pageInstCache.size() > SchemaFileConfig.PAGE_CACHE_SIZE) {
-          int removeCnt =
-              (int) (0.2 * pageInstCache.size()) > 0 ? (int) (0.2 * 
pageInstCache.size()) : 1;
-          List<Integer> rmvIds = new 
ArrayList<>(pageInstCache.keySet()).subList(0, removeCnt);
-
-          for (Integer id : rmvIds) {
-            // dirty pages only flushed from dirtyPages
-            if (pageLocks.findLock(id).writeLock().tryLock()) {
-              try {
-                pageInstCache.remove(id);
-              } finally {
-                pageLocks.findLock(id).writeLock().unlock();
-              }
-            }
-          }
-        }
-      } finally {
-        evictLock.unlock();
-      }
-    }
-    return page;
+  protected ISchemaPage addPageToCache(ISchemaPage page) {
+    // size control is left to operation entrance
+    // return value could use to assess whether key already existed
+    return this.pageInstCache.put(page.getPageIndex(), page);
   }
 
-  private synchronized int loadFromFile(ByteBuffer dst, int pageIndex) throws 
IOException {
+  private int loadFromFile(ByteBuffer dst, int pageIndex) throws IOException {
     dst.clear();
     if (!readChannel.isOpen()) {
       readChannel = FileChannel.open(pmtFile.toPath(), 
StandardOpenOption.READ);
@@ -683,24 +820,26 @@ public abstract class PageManager implements IPageManager 
{
     return readChannel.read(dst, getPageAddress(pageIndex));
   }
 
-  private void updateParentalRecord(ICachedMNode parent, String key, long 
newSegAddr)
+  private void updateParentalRecord(
+      ICachedMNode parent, String key, long newSegAddr, SchemaPageContext cxt)
       throws IOException, MetadataException {
     if (parent == null || parent.getChild(key).isDatabase()) {
       throw new MetadataException("Root page shall not be migrated.");
     }
-    long parSegAddr = parent.getParent() == null ? 0L : 
SchemaFile.getNodeAddress(parent);
-    parSegAddr = getTargetSegmentAddress(parSegAddr, key);
-    ISchemaPage page = getPageInstance(SchemaFile.getPageIndex(parSegAddr));
+    long parSegAddr = parent.getParent() == null ? 0L : getNodeAddress(parent);
+    parSegAddr = getTargetSegmentAddress(parSegAddr, key, cxt);
+    ISchemaPage page = getPageInstance(SchemaFile.getPageIndex(parSegAddr), 
cxt);
     ((SegmentedPage) 
page).updateRecordSegAddr(SchemaFile.getSegIndex(parSegAddr), key, newSegAddr);
-    markDirty(page);
+    cxt.markDirty(page);
   }
 
   // endregion
 
   // region Inner Utilities
 
-  private int subIndexRootPage(long addr) throws IOException, 
MetadataException {
-    return getPageInstance(SchemaFile.getPageIndex(addr)).getSubIndex();
+  private int subIndexRootPage(long addr, SchemaPageContext cxt)
+      throws IOException, MetadataException {
+    return getPageInstance(SchemaFile.getPageIndex(addr), cxt).getSubIndex();
   }
 
   private static long getPageAddress(int pageIndex) {
@@ -755,7 +894,7 @@ public abstract class PageManager implements IPageManager {
    * here. Supposed to merge with SchemaFile#reEstimateSegSize.
    *
    * @param expSize expected size calculated from next new record
-   * @param batchSize size of children within one {@linkplain 
#writeNewChildren(ICachedMNode)}
+   * @param batchSize size of children within one 
#writeNewChildren(ICachedMNode)
    * @return estimated size
    * @throws MetadataException
    */
@@ -798,54 +937,228 @@ public abstract class PageManager implements 
IPageManager {
 
   // region TestOnly Methods
 
+  @TestOnly
+  public String checkAllContexts() {
+    StringBuilder builder = new StringBuilder();
+
+    for (SchemaPageContext cxt : threadContexts.values()) {
+      builder.append(
+          String.format(
+              "cxt[%d] has %d pages, %d lock trace\n",
+              cxt.threadID, cxt.referredPages.size(), cxt.lockTraces.size()));
+    }
+
+    return builder.toString();
+  }
+
+  @TestOnly
+  public static String checkContextLock(SchemaPageContext cxt) {
+    StringBuilder builder = new StringBuilder();
+
+    for (ISchemaPage page : cxt.referredPages.values()) {
+      builder.append(
+          String.format(
+              "page:%d, wl:%d, rl:%d, trace:%b, dirty:%b;\n",
+              page.getPageIndex(),
+              ((ReentrantReadWriteLock) page.getLock()).getWriteHoldCount(),
+              ((ReentrantReadWriteLock) page.getLock()).getReadLockCount(),
+              cxt.lockTraces.contains(page.getPageIndex()),
+              page.isDirtyPage()));
+    }
+    return builder.toString();
+  }
+
   @TestOnly
   public long getTargetSegmentAddressOnTest(long curSegAddr, String recKey)
       throws IOException, MetadataException {
-    return getTargetSegmentAddress(curSegAddr, recKey);
+    return getTargetSegmentAddress(curSegAddr, recKey, new 
SchemaPageContext());
   }
 
   @TestOnly
   public ISchemaPage getPageInstanceOnTest(int pageIdx) throws IOException, 
MetadataException {
-    return getPageInstance(pageIdx);
+    SchemaPageContext cxt = new SchemaPageContext();
+    return getPageInstance(pageIdx, cxt);
   }
 
   // endregion
 
-  private static class PageLocks {
-    /**
-     * number of reentrant read write lock. Notice that this number should be 
a prime number for
-     * uniform hash
-     */
-    private static final int NUM_OF_LOCKS = 1039;
+  /** Thread local variables about write/update process. */
+  protected static class SchemaPageContext {
+    final long threadID;
+    final PageIndexSortBuckets indexBuckets;
+    // locked and dirty pages are all referred pages, they all reside in page 
cache
+    final Map<Integer, ISchemaPage> referredPages;
+    final Set<Integer> lockTraces;
+    // track B+Tree traversal trace
+    final int[] treeTrace;
+    int dirtyCnt;
+    int interleavedFlushCnt;
+
+    // flush B+Tree leaf before operation finished since all records are 
ordered
+    ISegmentedPage lastLeafPage;
+
+    public SchemaPageContext() {
+      threadID = Thread.currentThread().getId();
+      referredPages = new HashMap<>();
+      indexBuckets = new PageIndexSortBuckets(SchemaFileConfig.SEG_SIZE_LST, 
referredPages);
+      treeTrace = new int[16];
+      lockTraces = new HashSet<>();
+      lastLeafPage = null;
+      dirtyCnt = 0;
+      interleavedFlushCnt = 0;
+    }
+
+    public void markDirty(ISchemaPage page) {
+      markDirty(page, false);
+    }
 
-    /** locks array */
-    private final ReentrantReadWriteLock[] locks;
+    private void markDirty(ISchemaPage page, boolean forceReplace) {
+      if (!page.isDirtyPage()) {
+        dirtyCnt++;
+      }
+      page.setDirtyFlag();
+      refer(page);
+      if (forceReplace && referredPages.containsKey(page.getPageIndex())) {
 
-    protected PageLocks() {
-      locks = new ReentrantReadWriteLock[NUM_OF_LOCKS];
-      for (int i = 0; i < NUM_OF_LOCKS; i++) {
-        locks[i] = new ReentrantReadWriteLock();
+        // previous page is dirty, so it's not a new dirty page
+        if (referredPages.get(page.getPageIndex()).isDirtyPage()) {
+          dirtyCnt--;
+        }
+        // force to replace
+        referredPages.put(page.getPageIndex(), page);
       }
     }
 
-    public void readLock(int hash) {
-      findLock(hash).readLock().lock();
+    private void traceLock(ISchemaPage page) {
+      refer(page);
+      lockTraces.add(page.getPageIndex());
     }
 
-    public void readUnlock(int hash) {
-      findLock(hash).readLock().unlock();
+    // referred pages will not be evicted until operation finished
+    private void refer(ISchemaPage page) {
+      if (!referredPages.containsKey(page.getPageIndex())) {
+        page.getRefCnt().incrementAndGet();
+        referredPages.put(page.getPageIndex(), page);
+      }
+    }
+
+    /**
+     * Since records are ordered for write operation, it is reasonable to 
flush those left siblings
+     * of the active leaf page. The target page would be initiated at the 
first split within the
+     * operation.
+     *
+     * @param page left leaf of the split
+     */
+    public void invokeLastLeaf(ISchemaPage page) {
+      // only record at the first split
+      if (lastLeafPage == null) {
+        lastLeafPage = page.getAsSegmentedPage();
+      }
     }
 
-    public void writeLock(int hash) {
-      findLock(hash).writeLock().lock();
+    private void appendBucketIndex(PageIndexSortBuckets pisb) {
+      for (int i = 0; i < indexBuckets.buckets.length; i++) {
+        pisb.buckets[i].addAll(indexBuckets.buckets[i]);
+      }
     }
+  }
 
-    public void writeUnlock(int hash) {
-      findLock(hash).writeLock().unlock();
+  /**
+   * * Index buckets affiliated to a page collection. Indexes are sort into 
buckets according to
+   * spare space of its corresponding page instance.
+   */
+  protected static class PageIndexSortBuckets {
+    private final short[] bounds;
+    private final LinkedList<Integer>[] buckets;
+    private final Map<Integer, ISchemaPage> pageContainer;
+
+    public PageIndexSortBuckets(short[] borders, Map<Integer, ISchemaPage> 
container) {
+      bounds = Arrays.copyOf(borders, borders.length);
+      buckets = new LinkedList[borders.length];
+      pageContainer = container;
+      for (int i = 0; i < borders.length; i++) {
+        buckets[i] = new LinkedList<>();
+      }
     }
 
-    private ReentrantReadWriteLock findLock(int hash) {
-      return locks[hash % NUM_OF_LOCKS];
+    public void clear() {
+      for (Queue<Integer> q : buckets) {
+        q.clear();
+      }
+    }
+
+    public void sortIntoBucket(ISchemaPage page, short newSegSize) {
+      if (page.getAsSegmentedPage() == null) {
+        return;
+      }
+
+      // actual space occupied by a segment includes both its own length and 
the length of its
+      // offset. so available length for a segment is the spareSize minus the 
offset bytes
+      short availableSize =
+          newSegSize < 0
+              ? (short) (page.getAsSegmentedPage().getSpareSize() - 
SchemaFileConfig.SEG_OFF_DIG)
+              : (short)
+                  (page.getAsSegmentedPage().getSpareSize()
+                      - newSegSize
+                      - SchemaFileConfig.SEG_OFF_DIG);
+
+      // too small to index
+      if (availableSize <= SchemaFileConfig.SEG_HEADER_SIZE) {
+        return;
+      }
+
+      // be like: SEG_HEADER < buckets[0] <= bounds[0] < buckets[1] <= ...
+      for (int i = 0; i < bounds.length; i++) {
+        // the last of SEG_SIZE_LST is the maximum page size, definitely 
larger than others
+        if (availableSize <= bounds[i]) {
+          buckets[i].add(page.getPageIndex());
+          return;
+        }
+      }
+    }
+
+    /**
+     * @param withLock set if page container is a global/shared object
+     * @return the page index will be removed from the bucket.
+     */
+    public synchronized ISchemaPage getNearestFitPage(short size, boolean 
withLock) {
+      ISchemaPage targetPage;
+      int elemToCheck;
+      for (int i = 0; i < buckets.length && pageContainer.size() > 0; i++) {
+        // buckets[i] stores pages with spare space less than bounds[i]
+        elemToCheck = buckets[i].size();
+        while (size < bounds[i] && elemToCheck > 0) {
+          // find roughly fit page
+          targetPage = pageContainer.getOrDefault(buckets[i].poll(), null);
+          elemToCheck--;
+
+          if (targetPage == null || targetPage.getAsSegmentedPage() == null) {
+            // act as lazy remove on buckets
+            continue;
+          }
+
+          // seek in global container thus other page could be read locked
+          if (withLock
+              && targetPage.getAsSegmentedPage().isCapableForSegSize(size)
+              && targetPage.getLock().writeLock().tryLock()) {
+            if (targetPage.getAsSegmentedPage().isCapableForSegSize(size)) {
+              return targetPage.getAsSegmentedPage();
+            }
+            targetPage.getLock().writeLock().unlock();
+          }
+
+          // only in local dirty pages which are always write locked by itself
+          if (!withLock && 
targetPage.getAsSegmentedPage().isCapableForSegSize(size)) {
+            return targetPage;
+          }
+
+          // not large as expected, fit into suitable bucket
+          if (i > 0 && 
targetPage.getAsSegmentedPage().isCapableForSegSize(bounds[0])) {
+            sortIntoBucket(targetPage, (short) -1);
+          }
+        }
+      }
+      return null;
     }
   }
 }
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/metadata/mtree/schemafile/SchemaFileLogTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/metadata/mtree/schemafile/SchemaFileLogTest.java
index 71baa0cdc47..83f72b8e5f3 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/metadata/mtree/schemafile/SchemaFileLogTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/metadata/mtree/schemafile/SchemaFileLogTest.java
@@ -135,7 +135,9 @@ public class SchemaFileLogTest {
       sf.getChildren(lastNode);
       fail();
     } catch (Exception e) {
-      Assert.assertEquals("Segment(index:0) not found in page(index:2).", 
e.getMessage());
+      Assert.assertEquals(
+          String.format("Segment(index:0) not found in page(index:%d).", 
corruptPageIndex),
+          e.getMessage());
     } finally {
       sf.close();
     }
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/metadata/mtree/schemafile/SchemaFileTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/metadata/mtree/schemafile/SchemaFileTest.java
index 987185b9258..86490650270 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/metadata/mtree/schemafile/SchemaFileTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/metadata/mtree/schemafile/SchemaFileTest.java
@@ -230,7 +230,6 @@ public class SchemaFileTest {
     nsf.writeMNode(vt4);
 
     Assert.assertEquals(11111L, nsf.init().getAsDatabaseMNode().getDataTTL());
-
     nsf.close();
   }
 


Reply via email to