This is an automated email from the ASF dual-hosted git repository.

zyk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 070de203710 PBTree: Decouple PageManager (#11953)
070de203710 is described below

commit 070de203710edb0131bfa32208255c4c72df64a7
Author: ZhaoXin <[email protected]>
AuthorDate: Tue Jan 23 15:57:08 2024 +0800

    PBTree: Decouple PageManager (#11953)
---
 .../schemafile/pagemgr/BTreePageManager.java       |   8 +-
 .../pbtree/schemafile/pagemgr/PageIOChannel.java   | 184 +++++++++
 .../schemafile/pagemgr/PageIndexSortBuckets.java   | 129 ++++++
 .../pbtree/schemafile/pagemgr/PageManager.java     | 452 ++-------------------
 .../impl/pbtree/schemafile/pagemgr/PagePool.java   | 152 +++++++
 .../schemafile/pagemgr/SchemaPageContext.java      | 109 +++++
 6 files changed, 605 insertions(+), 429 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/pagemgr/BTreePageManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/pagemgr/BTreePageManager.java
index ddb47c97361..d6b87431d3b 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/pagemgr/BTreePageManager.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/pagemgr/BTreePageManager.java
@@ -314,7 +314,7 @@ public class BTreePageManager extends PageManager {
 
   @Override
   public void delete(ICachedMNode node) throws IOException, MetadataException {
-    cacheGuardian();
+    pagePool.cacheGuardian();
     SchemaPageContext cxt = new SchemaPageContext();
     // node is the record deleted from its segment
     entrantLock(node.getParent(), cxt);
@@ -382,7 +382,7 @@ public class BTreePageManager extends PageManager {
       flushDirtyPages(cxt);
     } finally {
       releaseLocks(cxt);
-      releaseReferent(cxt);
+      pagePool.releaseReferent(cxt);
     }
   }
 
@@ -481,7 +481,7 @@ public class BTreePageManager extends PageManager {
       return child;
     } finally {
       initPage.getLock().readLock().unlock();
-      releaseReferent(cxt);
+      pagePool.releaseReferent(cxt);
       threadContexts.remove(Thread.currentThread().getId(), cxt);
     }
   }
@@ -566,7 +566,7 @@ public class BTreePageManager extends PageManager {
     } finally {
       // safety of iterator should be guaranteed by upper layer
       pageHeldLock.getLock().readLock().unlock();
-      releaseReferent(cxt);
+      pagePool.releaseReferent(cxt);
     }
   }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/pagemgr/PageIOChannel.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/pagemgr/PageIOChannel.java
new file mode 100644
index 00000000000..87a05cdca42
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/pagemgr/PageIOChannel.java
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package 
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.schemafile.pagemgr;
+
+import org.apache.iotdb.commons.exception.MetadataException;
+import 
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.schemafile.ISchemaPage;
+import 
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.schemafile.SchemaFileConfig;
+import 
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.schemafile.log.SchemaFileLogReader;
+import 
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.schemafile.log.SchemaFileLogWriter;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.nio.file.StandardOpenOption;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
+
+import static 
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.schemafile.SchemaFile.getPageAddress;
+
+public class PageIOChannel {
+  private final FileChannel channel;
+  private final File pmtFile;
+  private FileChannel readChannel;
+  private final AtomicInteger logCounter;
+  private SchemaFileLogWriter logWriter;
+
+  // flush strategy is dependent on consensus protocol, only check protocol on 
init
+  protected FlushPageStrategy flushDirtyPagesStrategy;
+  protected SinglePageFlushStrategy singlePageFlushStrategy;
+
+  PageIOChannel(FileChannel channel, File pmtFile, boolean flushWithLogging, 
String logPath)
+      throws IOException, MetadataException {
+    this.channel = channel;
+    this.pmtFile = pmtFile;
+    this.readChannel = FileChannel.open(pmtFile.toPath(), 
StandardOpenOption.READ);
+    if (flushWithLogging) {
+      // without RATIS, utilize physical logging for integrity
+      int pageAcc = (int) recoverFromLog(logPath) / 
SchemaFileConfig.PAGE_LENGTH;
+      this.logWriter = new SchemaFileLogWriter(logPath);
+      logCounter = new AtomicInteger(pageAcc);
+      flushDirtyPagesStrategy = this::flushDirtyPagesWithLogging;
+      singlePageFlushStrategy = this::flushSinglePageWithLogging;
+    } else {
+      // with RATIS enabled, integrity is guaranteed by consensus protocol
+      logCounter = new AtomicInteger();
+      logWriter = null;
+      flushDirtyPagesStrategy = this::flushDirtyPagesWithoutLogging;
+      singlePageFlushStrategy = this::flushSinglePageWithoutLogging;
+    }
+  }
+
+  public void renewLogWriter() throws IOException {
+    if (logWriter != null) {
+      logWriter.renew();
+    }
+  }
+
+  public void closeLogWriter() throws IOException {
+    if (logWriter != null) {
+      logWriter.close();
+    }
+  }
+
+  /** Load bytes from log, deserialize and flush directly into channel, return 
current length */
+  private long recoverFromLog(String logPath) throws IOException, 
MetadataException {
+    SchemaFileLogReader reader = new SchemaFileLogReader(logPath);
+    ISchemaPage page;
+    List<byte[]> res = reader.collectUpdatedEntries();
+    for (byte[] entry : res) {
+      // TODO check bytes semantic correctness with CRC32 or other way
+      page = ISchemaPage.loadSchemaPage(ByteBuffer.wrap(entry));
+      page.flushPageToChannel(this.channel);
+    }
+    reader.close();
+
+    // complete log file
+    if (!res.isEmpty()) {
+      try (FileOutputStream outputStream = new FileOutputStream(logPath, 
true)) {
+        outputStream.write(new byte[] {SchemaFileConfig.SF_COMMIT_MARK});
+        return outputStream.getChannel().size();
+      }
+    }
+    return 0L;
+  }
+
+  public void loadFromFileToBuffer(ByteBuffer dst, int pageIndex) throws 
IOException {
+    dst.clear();
+    if (!readChannel.isOpen()) {
+      readChannel = FileChannel.open(pmtFile.toPath(), 
StandardOpenOption.READ);
+    }
+    readChannel.read(dst, getPageAddress(pageIndex));
+  }
+
+  // region Flush Strategy
+  @FunctionalInterface
+  interface FlushPageStrategy {
+    void apply(List<ISchemaPage> dirtyPages) throws IOException;
+  }
+
+  @FunctionalInterface
+  interface SinglePageFlushStrategy {
+    void apply(ISchemaPage page) throws IOException;
+  }
+
+  private void flushDirtyPagesWithLogging(List<ISchemaPage> dirtyPages) throws 
IOException {
+    if (dirtyPages.size() == 0) {
+      return;
+    }
+
+    if (logCounter.get() > SchemaFileConfig.SCHEMA_FILE_LOG_SIZE) {
+      logWriter = logWriter.renew();
+      logCounter.set(0);
+    }
+
+    logCounter.addAndGet(dirtyPages.size());
+    for (ISchemaPage page : dirtyPages) {
+      page.syncPageBuffer();
+      logWriter.write(page);
+    }
+    logWriter.prepare();
+
+    for (ISchemaPage page : dirtyPages) {
+      page.flushPageToChannel(channel);
+    }
+    logWriter.commit();
+  }
+
+  private void flushSinglePageWithLogging(ISchemaPage page) throws IOException 
{
+    if (logCounter.get() > SchemaFileConfig.SCHEMA_FILE_LOG_SIZE) {
+      logWriter = logWriter.renew();
+      logCounter.set(0);
+    }
+
+    logCounter.addAndGet(1);
+    page.syncPageBuffer();
+    logWriter.write(page);
+    logWriter.prepare();
+    page.flushPageToChannel(channel);
+    logWriter.commit();
+  }
+
+  private void flushDirtyPagesWithoutLogging(List<ISchemaPage> dirtyPages) 
throws IOException {
+    for (ISchemaPage page : dirtyPages) {
+      page.syncPageBuffer();
+      page.flushPageToChannel(channel);
+    }
+  }
+
+  private void flushSinglePageWithoutLogging(ISchemaPage page) throws 
IOException {
+    page.syncPageBuffer();
+    page.flushPageToChannel(channel);
+  }
+
+  public synchronized void flushMultiPages(SchemaPageContext cxt) throws 
IOException {
+    flushDirtyPagesStrategy.apply(
+        cxt.referredPages.values().stream()
+            .filter(ISchemaPage::isDirtyPage)
+            .collect(Collectors.toList()));
+  }
+
+  public void flushSinglePage(ISchemaPage page) throws IOException {
+    singlePageFlushStrategy.apply(page);
+  }
+  // endregion
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/pagemgr/PageIndexSortBuckets.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/pagemgr/PageIndexSortBuckets.java
new file mode 100644
index 00000000000..a0cf716dcc3
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/pagemgr/PageIndexSortBuckets.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package 
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.schemafile.pagemgr;
+
+import 
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.schemafile.ISchemaPage;
+import 
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.schemafile.SchemaFileConfig;
+
+import java.util.ArrayDeque;
+import java.util.Arrays;
+import java.util.Map;
+
+/**
+ * * Index buckets affiliated to a page collection. Indexes are sort into 
buckets according to spare
+ * space of its corresponding page instance.
+ */
+class PageIndexSortBuckets {
+  private final short[] bounds;
+  private final ArrayDeque[] buckets;
+  private final Map<Integer, ISchemaPage> pageContainer;
+
+  public PageIndexSortBuckets(short[] borders, Map<Integer, ISchemaPage> 
container) {
+    bounds = Arrays.copyOf(borders, borders.length);
+    buckets = new ArrayDeque[borders.length];
+    pageContainer = container;
+    for (int i = 0; i < borders.length; i++) {
+      buckets[i] = new ArrayDeque();
+    }
+  }
+
+  public void clear() {
+    for (ArrayDeque q : buckets) {
+      q.clear();
+    }
+  }
+
+  public void sortIntoBucket(ISchemaPage page, short newSegSize) {
+    if (page.getAsSegmentedPage() == null) {
+      return;
+    }
+
+    // actual space occupied by a segment includes both its own length and the 
length of its
+    // offset. so available length for a segment is the spareSize minus the 
offset bytes
+    short availableSize =
+        newSegSize < 0
+            ? (short) (page.getAsSegmentedPage().getSpareSize() - 
SchemaFileConfig.SEG_OFF_DIG)
+            : (short)
+                (page.getAsSegmentedPage().getSpareSize()
+                    - newSegSize
+                    - SchemaFileConfig.SEG_OFF_DIG);
+
+    // too small to index
+    if (availableSize <= SchemaFileConfig.SEG_HEADER_SIZE) {
+      return;
+    }
+
+    // be like: SEG_HEADER < buckets[0] <= bounds[0] < buckets[1] <= ...
+    for (int i = 0; i < bounds.length; i++) {
+      // the last of SEG_SIZE_LST is the maximum page size, definitely larger 
than others
+      if (availableSize <= bounds[i]) {
+        buckets[i].add(page.getPageIndex());
+        return;
+      }
+    }
+  }
+
+  public ArrayDeque<Integer> getBucket(int index) {
+    return buckets[index];
+  }
+
+  /**
+   * @param withLock set if page container is a global/shared object
+   * @return the page index will be removed from the bucket.
+   */
+  public synchronized ISchemaPage getNearestFitPage(short size, boolean 
withLock) {
+    ISchemaPage targetPage;
+    int elemToCheck;
+    for (int i = 0; i < buckets.length && pageContainer.size() > 0; i++) {
+      // buckets[i] stores pages with spare space less than bounds[i]
+      elemToCheck = buckets[i].size();
+      while (size < bounds[i] && elemToCheck > 0) {
+        // find roughly fit page
+        targetPage = pageContainer.getOrDefault(buckets[i].poll(), null);
+        elemToCheck--;
+
+        if (targetPage == null || targetPage.getAsSegmentedPage() == null) {
+          // act as lazy remove on buckets
+          continue;
+        }
+
+        // seek in global container thus other page could be read locked
+        if (withLock
+            && targetPage.getAsSegmentedPage().isCapableForSegSize(size)
+            && targetPage.getLock().writeLock().tryLock()) {
+          if (targetPage.getAsSegmentedPage().isCapableForSegSize(size)) {
+            return targetPage.getAsSegmentedPage();
+          }
+          targetPage.getLock().writeLock().unlock();
+        }
+
+        // only in local dirty pages which are always write locked by itself
+        if (!withLock && 
targetPage.getAsSegmentedPage().isCapableForSegSize(size)) {
+          return targetPage;
+        }
+
+        // not large as expected, fit into suitable bucket
+        if (i > 0 && 
targetPage.getAsSegmentedPage().isCapableForSegSize(bounds[0])) {
+          sortIntoBucket(targetPage, (short) -1);
+        }
+      }
+    }
+    return null;
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/pagemgr/PageManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/pagemgr/PageManager.java
index acd41d192af..ad92a332d57 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/pagemgr/PageManager.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/pagemgr/PageManager.java
@@ -32,35 +32,18 @@ import 
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.schemafil
 import 
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.schemafile.SchemaFile;
 import 
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.schemafile.SchemaFileConfig;
 import 
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.schemafile.SegmentedPage;
-import 
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.schemafile.log.SchemaFileLogReader;
-import 
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.schemafile.log.SchemaFileLogWriter;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.File;
-import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.PrintWriter;
 import java.nio.ByteBuffer;
 import java.nio.channels.FileChannel;
-import java.nio.file.StandardOpenOption;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.LinkedHashMap;
-import java.util.LinkedList;
-import java.util.List;
 import java.util.Map;
-import java.util.Queue;
-import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.locks.Condition;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 import java.util.stream.Collectors;
 
@@ -76,63 +59,29 @@ import static 
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.sc
 public abstract class PageManager implements IPageManager {
   protected static final Logger logger = 
LoggerFactory.getLogger(PageManager.class);
 
-  protected final Map<Integer, ISchemaPage> pageInstCache;
-  // bucket for quick retrieval, only append when write operation finished
-  protected final PageIndexSortBuckets pageIndexBuckets;
-
-  protected final Lock cacheLock;
-  protected final Condition cacheFull;
+  protected final PagePool pagePool;
+  protected final PageIOChannel pageIOChannel;
 
   protected final Map<Long, SchemaPageContext> threadContexts;
 
   protected final AtomicInteger lastPageIndex;
 
-  private final FileChannel channel;
-
-  // handle timeout interruption during reading
-  private final File pmtFile;
-  private FileChannel readChannel;
-
-  private final AtomicInteger logCounter;
-  private SchemaFileLogWriter logWriter;
   private SchemaRegionCachedMetric metric = null;
 
-  // flush strategy is dependent on consensus protocol, only check protocol on 
init
-  protected FlushPageStrategy flushDirtyPagesStrategy;
-  protected SinglePageFlushStrategy singlePageFlushStrategy;
-
   PageManager(FileChannel channel, File pmtFile, int lastPageIndex, String 
logPath)
       throws IOException, MetadataException {
-    this.pageInstCache =
-        Collections.synchronizedMap(new 
LinkedHashMap<>(SchemaFileConfig.PAGE_CACHE_SIZE, 1, true));
-    this.pageIndexBuckets = new 
PageIndexSortBuckets(SchemaFileConfig.SEG_SIZE_LST, pageInstCache);
+    this.pagePool = new PagePool();
     this.threadContexts = new ConcurrentHashMap<>();
-
-    this.cacheLock = new ReentrantLock();
-    this.cacheFull = this.cacheLock.newCondition();
-
     this.lastPageIndex =
         lastPageIndex >= 0 ? new AtomicInteger(lastPageIndex) : new 
AtomicInteger(0);
-    this.channel = channel;
-    this.pmtFile = pmtFile;
-    this.readChannel = FileChannel.open(pmtFile.toPath(), 
StandardOpenOption.READ);
 
     if (IoTDBDescriptor.getInstance()
         .getConfig()
         .getSchemaRegionConsensusProtocolClass()
         .equals(ConsensusFactory.RATIS_CONSENSUS)) {
-      // with RATIS enabled, integrity is guaranteed by consensus protocol
-      logCounter = new AtomicInteger();
-      logWriter = null;
-      flushDirtyPagesStrategy = this::flushDirtyPagesWithoutLogging;
-      singlePageFlushStrategy = this::flushSinglePageWithoutLogging;
+      pageIOChannel = new PageIOChannel(channel, pmtFile, false, logPath);
     } else {
-      // without RATIS, utilize physical logging for integrity
-      int pageAcc = (int) recoverFromLog(logPath) / 
SchemaFileConfig.PAGE_LENGTH;
-      this.logWriter = new SchemaFileLogWriter(logPath);
-      logCounter = new AtomicInteger(pageAcc);
-      flushDirtyPagesStrategy = this::flushDirtyPagesWithLogging;
-      singlePageFlushStrategy = this::flushSinglePageWithLogging;
+      pageIOChannel = new PageIOChannel(channel, pmtFile, true, logPath);
     }
 
     // construct first page if file to init
@@ -140,81 +89,17 @@ public abstract class PageManager implements IPageManager {
       ISegmentedPage rootPage =
           
ISchemaPage.initSegmentedPage(ByteBuffer.allocate(SchemaFileConfig.PAGE_LENGTH),
 0);
       rootPage.allocNewSegment(SchemaFileConfig.SEG_MAX_SIZ);
-      pageInstCache.put(rootPage.getPageIndex(), rootPage);
+      pagePool.put(rootPage);
       rootPage.syncPageBuffer();
       rootPage.flushPageToChannel(channel);
     }
   }
 
-  /** Load bytes from log, deserialize and flush directly into channel, return 
current length */
-  private long recoverFromLog(String logPath) throws IOException, 
MetadataException {
-    SchemaFileLogReader reader = new SchemaFileLogReader(logPath);
-    ISchemaPage page;
-    List<byte[]> res = reader.collectUpdatedEntries();
-    for (byte[] entry : res) {
-      // TODO check bytes semantic correctness with CRC32 or other way
-      page = ISchemaPage.loadSchemaPage(ByteBuffer.wrap(entry));
-      page.flushPageToChannel(this.channel);
-    }
-    reader.close();
-
-    // complete log file
-    if (!res.isEmpty()) {
-      try (FileOutputStream outputStream = new FileOutputStream(logPath, 
true)) {
-        outputStream.write(new byte[] {SchemaFileConfig.SF_COMMIT_MARK});
-        return outputStream.getChannel().size();
-      }
-    }
-    return 0L;
-  }
-
-  /**
-   * A rough cache size guardian, all threads passed this entrant check will 
not be limited with
-   * cache size anymore. TODO A better guardian is based on constraint per 
thread.
-   */
-  protected void cacheGuardian() {
-    cacheLock.lock();
-    try {
-      while (pageInstCache.size() > SchemaFileConfig.PAGE_CACHE_SIZE) {
-        try {
-          // try to evict by LRU
-          Iterator<ISchemaPage> iterator = pageInstCache.values().iterator();
-          int pageSizeLimit = SchemaFileConfig.PAGE_CACHE_SIZE, size = 
pageInstCache.size();
-
-          ISchemaPage p;
-          while (iterator.hasNext()) {
-            p = iterator.next();
-
-            if (size <= pageSizeLimit) {
-              break;
-            }
-
-            if (p.getRefCnt().get() == 0) {
-              iterator.remove();
-              size--;
-            }
-          }
-
-          if (pageInstCache.size() > SchemaFileConfig.PAGE_CACHE_SIZE) {
-            // wait until another operation finished and released pages
-            cacheFull.await();
-          }
-        } catch (InterruptedException e) {
-          logger.warn(
-              "Interrupted during page cache eviction. Consider increasing 
cache size, "
-                  + "reducing concurrency, or extending timeout");
-        }
-      }
-    } finally {
-      cacheLock.unlock();
-    }
-  }
-
   @Override
   public void writeMNode(ICachedMNode node) throws MetadataException, 
IOException {
     SchemaPageContext cxt = new SchemaPageContext();
     threadContexts.put(Thread.currentThread().getId(), cxt);
-    cacheGuardian();
+    pagePool.cacheGuardian();
     entrantLock(node, cxt);
     try {
       writeNewChildren(node, cxt);
@@ -222,7 +107,7 @@ public abstract class PageManager implements IPageManager {
       flushDirtyPages(cxt);
     } finally {
       releaseLocks(cxt);
-      releaseReferent(cxt);
+      pagePool.releaseReferent(cxt);
       threadContexts.remove(Thread.currentThread().getId(), cxt);
     }
   }
@@ -234,33 +119,6 @@ public abstract class PageManager implements IPageManager {
     }
   }
 
-  /** release referents and evict likely useless page if necessary */
-  protected void releaseReferent(SchemaPageContext cxt) {
-    for (ISchemaPage p : cxt.referredPages.values()) {
-      p.decrementAndGetRefCnt();
-    }
-
-    if (pageInstCache.size() > SchemaFileConfig.PAGE_CACHE_SIZE) {
-      cacheLock.lock();
-      try {
-        for (ISchemaPage p : cxt.referredPages.values()) {
-          // unnecessary to evict the page object in context by 2 case:
-          //  1. it is held by another thread, e.g., RefCnt != 0
-          //  2. it had already been evicted, e.g., pageCache.get(id) != page
-          if (p.getRefCnt().get() == 0 && pageInstCache.get(p.getPageIndex()) 
== p) {
-            pageInstCache.remove(p.getPageIndex());
-          }
-        }
-
-        if (pageInstCache.size() <= SchemaFileConfig.PAGE_CACHE_SIZE) {
-          cacheFull.signal();
-        }
-      } finally {
-        cacheLock.unlock();
-      }
-    }
-  }
-
   /** locking in the order of page index to avoid deadlock. */
   protected void entrantLock(ICachedMNode node, SchemaPageContext cxt)
       throws IOException, MetadataException {
@@ -567,74 +425,13 @@ public abstract class PageManager implements IPageManager 
{
   // endregion
 
   // region Flush Strategy
-  @FunctionalInterface
-  interface FlushPageStrategy {
-    void apply(List<ISchemaPage> dirtyPages) throws IOException;
-  }
-
-  @FunctionalInterface
-  interface SinglePageFlushStrategy {
-    void apply(ISchemaPage page) throws IOException;
-  }
-
-  private void flushDirtyPagesWithLogging(List<ISchemaPage> dirtyPages) throws 
IOException {
-    if (dirtyPages.size() == 0) {
-      return;
-    }
-
-    if (logCounter.get() > SchemaFileConfig.SCHEMA_FILE_LOG_SIZE) {
-      logWriter = logWriter.renew();
-      logCounter.set(0);
-    }
-
-    logCounter.addAndGet(dirtyPages.size());
-    for (ISchemaPage page : dirtyPages) {
-      page.syncPageBuffer();
-      logWriter.write(page);
-    }
-    logWriter.prepare();
-
-    for (ISchemaPage page : dirtyPages) {
-      page.flushPageToChannel(channel);
-    }
-    logWriter.commit();
-  }
-
-  private void flushSinglePageWithLogging(ISchemaPage page) throws IOException 
{
-    if (logCounter.get() > SchemaFileConfig.SCHEMA_FILE_LOG_SIZE) {
-      logWriter = logWriter.renew();
-      logCounter.set(0);
-    }
-
-    logCounter.addAndGet(1);
-    page.syncPageBuffer();
-    logWriter.write(page);
-    logWriter.prepare();
-    page.flushPageToChannel(channel);
-    logWriter.commit();
-  }
-
-  private void flushDirtyPagesWithoutLogging(List<ISchemaPage> dirtyPages) 
throws IOException {
-    for (ISchemaPage page : dirtyPages) {
-      page.syncPageBuffer();
-      page.flushPageToChannel(channel);
-    }
-  }
-
-  private void flushSinglePageWithoutLogging(ISchemaPage page) throws 
IOException {
-    page.syncPageBuffer();
-    page.flushPageToChannel(channel);
-  }
 
   public synchronized void flushDirtyPages(SchemaPageContext cxt) throws 
IOException {
     if (cxt.dirtyCnt == 0) {
       return;
     }
-    flushDirtyPagesStrategy.apply(
-        cxt.referredPages.values().stream()
-            .filter(ISchemaPage::isDirtyPage)
-            .collect(Collectors.toList()));
-    cxt.appendBucketIndex(pageIndexBuckets);
+    pageIOChannel.flushMultiPages(cxt);
+    pagePool.appendBucketIndex(cxt);
     if (metric != null) {
       metric.recordFlushPageNum(cxt.referredPages.size());
     }
@@ -645,6 +442,7 @@ public abstract class PageManager implements IPageManager {
    * the lastLeaf, unlock, deref, and remove it from dirtyPages. lastLeafPage 
only initiated at
    * overflowOperation
    */
+  @Deprecated
   private void interleavedFlush(ISchemaPage page, SchemaPageContext cxt) 
throws IOException {
     if (cxt.lastLeafPage == null || cxt.lastLeafPage.getPageIndex() == 
page.getPageIndex()) {
       return;
@@ -653,7 +451,7 @@ public abstract class PageManager implements IPageManager {
     if (metric != null) {
       metric.recordFlushPageNum(1);
     }
-    singlePageFlushStrategy.apply(cxt.lastLeafPage);
+    pageIOChannel.flushSinglePage(cxt.lastLeafPage);
     // this lastLeaf shall only be lock once
     cxt.dirtyCnt--;
 
@@ -667,7 +465,7 @@ public abstract class PageManager implements IPageManager {
     // can be reclaimed since the page only referred by pageInstCache
     cxt.referredPages.remove(cxt.lastLeafPage.getPageIndex());
     // alleviate eviction pressure
-    pageInstCache.remove(cxt.lastLeafPage.getPageIndex());
+    pagePool.remove(cxt.lastLeafPage.getPageIndex());
 
     cxt.lastLeafPage = page.getAsSegmentedPage();
   }
@@ -681,9 +479,9 @@ public abstract class PageManager implements IPageManager {
 
   @Override
   public void clear() throws IOException, MetadataException {
-    pageInstCache.clear();
+    pagePool.clear();
     lastPageIndex.set(0);
-    logWriter = logWriter == null ? null : logWriter.renew();
+    pageIOChannel.renewLogWriter();
   }
 
   @Override
@@ -700,9 +498,7 @@ public abstract class PageManager implements IPageManager {
 
   @Override
   public void close() throws IOException {
-    if (logWriter != null) {
-      logWriter.close();
-    }
+    pageIOChannel.closeLogWriter();
   }
 
   // endregion
@@ -723,9 +519,9 @@ public abstract class PageManager implements IPageManager {
 
     // lock for no duplicate page with same index from disk, and guarantees 
page will not be evicted
     //  by other thread before referred by current thread
-    cacheLock.lock();
+    pagePool.lock();
     try {
-      ISchemaPage page = pageInstCache.get(pageIdx);
+      ISchemaPage page = pagePool.get(pageIdx);
       if (page != null) {
         cxt.refer(page);
         return page;
@@ -735,13 +531,13 @@ public abstract class PageManager implements IPageManager 
{
       if (metric != null) {
         metric.recordLoadPageNum(1);
       }
-      loadFromFile(newBuf, pageIdx);
+      pageIOChannel.loadFromFileToBuffer(newBuf, pageIdx);
       page = ISchemaPage.loadSchemaPage(newBuf);
       cxt.refer(page);
-      addPageToCache(page);
+      pagePool.put(page);
       return page;
     } finally {
-      cacheLock.unlock();
+      pagePool.unlock();
     }
   }
 
@@ -754,7 +550,7 @@ public abstract class PageManager implements IPageManager {
   protected ISchemaPage replacePageInCache(ISchemaPage page, SchemaPageContext 
cxt) {
     // no need to lock since the root of B+Tree is locked
     cxt.markDirty(page, true);
-    addPageToCache(page);
+    pagePool.put(page);
     return page;
   }
 
@@ -773,10 +569,10 @@ public abstract class PageManager implements IPageManager 
{
       return targetPage.getAsSegmentedPage();
     }
 
-    cacheLock.lock();
+    pagePool.lock();
     try {
       // pageIndexBuckets sorts pages within pageInstCache into buckets to 
expedite access
-      targetPage = pageIndexBuckets.getNearestFitPage(size, true);
+      targetPage = pagePool.getNearestFitPage(size);
       if (targetPage != null) {
         cxt.markDirty(targetPage);
         cxt.traceLock(targetPage);
@@ -792,7 +588,7 @@ public abstract class PageManager implements IPageManager {
       cxt.indexBuckets.sortIntoBucket(targetPage, size);
       return targetPage.getAsSegmentedPage();
     } finally {
-      cacheLock.unlock();
+      pagePool.unlock();
     }
   }
 
@@ -804,7 +600,7 @@ public abstract class PageManager implements IPageManager {
     cxt.markDirty(newPage);
     newPage.getLock().writeLock().lock();
     cxt.traceLock(newPage);
-    addPageToCache(newPage);
+    pagePool.put(newPage);
     return newPage;
   }
 
@@ -812,25 +608,11 @@ public abstract class PageManager implements IPageManager 
{
     // new page will be LOCAL until the creating thread finished
     //  thus not added to sparsePageIndex now
     page.setPageIndex(lastPageIndex.incrementAndGet());
-    addPageToCache(page);
+    pagePool.put(page);
     cxt.markDirty(page);
     return page;
   }
 
-  protected ISchemaPage addPageToCache(ISchemaPage page) {
-    // size control is left to operation entrance
-    // return value could use to assess whether key already existed
-    return this.pageInstCache.put(page.getPageIndex(), page);
-  }
-
-  private int loadFromFile(ByteBuffer dst, int pageIndex) throws IOException {
-    dst.clear();
-    if (!readChannel.isOpen()) {
-      readChannel = FileChannel.open(pmtFile.toPath(), 
StandardOpenOption.READ);
-    }
-    return readChannel.read(dst, getPageAddress(pageIndex));
-  }
-
   private void updateParentalRecord(
       ICachedMNode parent, String key, long newSegAddr, SchemaPageContext cxt)
       throws IOException, MetadataException {
@@ -978,186 +760,6 @@ public abstract class PageManager implements IPageManager 
{
 
   // endregion
 
-  /** Thread local variables about write/update process. */
-  protected static class SchemaPageContext {
-    final long threadID;
-    final PageIndexSortBuckets indexBuckets;
-    // locked and dirty pages are all referred pages, they all reside in page 
cache
-    final Map<Integer, ISchemaPage> referredPages;
-    final Set<Integer> lockTraces;
-    // track B+Tree traversal trace
-    final int[] treeTrace;
-    int dirtyCnt;
-    int interleavedFlushCnt;
-
-    // flush B+Tree leaf before operation finished since all records are 
ordered
-    ISegmentedPage lastLeafPage;
-
-    public SchemaPageContext() {
-      threadID = Thread.currentThread().getId();
-      referredPages = new HashMap<>();
-      indexBuckets = new PageIndexSortBuckets(SchemaFileConfig.SEG_SIZE_LST, 
referredPages);
-      treeTrace = new int[16];
-      lockTraces = new HashSet<>();
-      lastLeafPage = null;
-      dirtyCnt = 0;
-      interleavedFlushCnt = 0;
-    }
-
-    protected void markDirty(ISchemaPage page) {
-      markDirty(page, false);
-    }
-
-    protected void markDirty(ISchemaPage page, boolean forceReplace) {
-      if (!page.isDirtyPage()) {
-        dirtyCnt++;
-      }
-      page.setDirtyFlag();
-      refer(page);
-      if (forceReplace && referredPages.containsKey(page.getPageIndex())) {
-
-        // previous page is dirty, so it's not a new dirty page
-        if (referredPages.get(page.getPageIndex()).isDirtyPage()) {
-          dirtyCnt--;
-        }
-        // force to replace
-        referredPages.put(page.getPageIndex(), page);
-      }
-    }
-
-    protected void traceLock(ISchemaPage page) {
-      refer(page);
-      lockTraces.add(page.getPageIndex());
-    }
-
-    // referred pages will not be evicted until operation finished
-    private void refer(ISchemaPage page) {
-      if (!referredPages.containsKey(page.getPageIndex())) {
-        page.incrementAndGetRefCnt();
-        referredPages.put(page.getPageIndex(), page);
-      }
-    }
-
-    /**
-     * Since records are ordered for write operation, it is reasonable to 
flush those left siblings
-     * of the active leaf page. The target page would be initiated at the 
first split within the
-     * operation.
-     *
-     * @param page left leaf of the split
-     */
-    public void invokeLastLeaf(ISchemaPage page) {
-      // only record at the first split
-      if (lastLeafPage == null) {
-        lastLeafPage = page.getAsSegmentedPage();
-      }
-    }
-
-    private void appendBucketIndex(PageIndexSortBuckets pisb) {
-      for (int i = 0; i < indexBuckets.buckets.length; i++) {
-        pisb.buckets[i].addAll(indexBuckets.buckets[i]);
-      }
-    }
-  }
-
-  /**
-   * * Index buckets affiliated to a page collection. Indexes are sort into 
buckets according to
-   * spare space of its corresponding page instance.
-   */
-  protected static class PageIndexSortBuckets {
-    private final short[] bounds;
-    private final LinkedList<Integer>[] buckets;
-    private final Map<Integer, ISchemaPage> pageContainer;
-
-    public PageIndexSortBuckets(short[] borders, Map<Integer, ISchemaPage> 
container) {
-      bounds = Arrays.copyOf(borders, borders.length);
-      buckets = (LinkedList<Integer>[]) new LinkedList[borders.length];
-      pageContainer = container;
-      for (int i = 0; i < borders.length; i++) {
-        buckets[i] = new LinkedList<>();
-      }
-    }
-
-    public void clear() {
-      for (Queue<Integer> q : buckets) {
-        q.clear();
-      }
-    }
-
-    public void sortIntoBucket(ISchemaPage page, short newSegSize) {
-      if (page.getAsSegmentedPage() == null) {
-        return;
-      }
-
-      // actual space occupied by a segment includes both its own length and 
the length of its
-      // offset. so available length for a segment is the spareSize minus the 
offset bytes
-      short availableSize =
-          newSegSize < 0
-              ? (short) (page.getAsSegmentedPage().getSpareSize() - 
SchemaFileConfig.SEG_OFF_DIG)
-              : (short)
-                  (page.getAsSegmentedPage().getSpareSize()
-                      - newSegSize
-                      - SchemaFileConfig.SEG_OFF_DIG);
-
-      // too small to index
-      if (availableSize <= SchemaFileConfig.SEG_HEADER_SIZE) {
-        return;
-      }
-
-      // be like: SEG_HEADER < buckets[0] <= bounds[0] < buckets[1] <= ...
-      for (int i = 0; i < bounds.length; i++) {
-        // the last of SEG_SIZE_LST is the maximum page size, definitely 
larger than others
-        if (availableSize <= bounds[i]) {
-          buckets[i].add(page.getPageIndex());
-          return;
-        }
-      }
-    }
-
-    /**
-     * @param withLock set if page container is a global/shared object
-     * @return the page index will be removed from the bucket.
-     */
-    public synchronized ISchemaPage getNearestFitPage(short size, boolean 
withLock) {
-      ISchemaPage targetPage;
-      int elemToCheck;
-      for (int i = 0; i < buckets.length && pageContainer.size() > 0; i++) {
-        // buckets[i] stores pages with spare space less than bounds[i]
-        elemToCheck = buckets[i].size();
-        while (size < bounds[i] && elemToCheck > 0) {
-          // find roughly fit page
-          targetPage = pageContainer.getOrDefault(buckets[i].poll(), null);
-          elemToCheck--;
-
-          if (targetPage == null || targetPage.getAsSegmentedPage() == null) {
-            // act as lazy remove on buckets
-            continue;
-          }
-
-          // seek in global container thus other page could be read locked
-          if (withLock
-              && targetPage.getAsSegmentedPage().isCapableForSegSize(size)
-              && targetPage.getLock().writeLock().tryLock()) {
-            if (targetPage.getAsSegmentedPage().isCapableForSegSize(size)) {
-              return targetPage.getAsSegmentedPage();
-            }
-            targetPage.getLock().writeLock().unlock();
-          }
-
-          // only in local dirty pages which are always write locked by itself
-          if (!withLock && 
targetPage.getAsSegmentedPage().isCapableForSegSize(size)) {
-            return targetPage;
-          }
-
-          // not large as expected, fit into suitable bucket
-          if (i > 0 && 
targetPage.getAsSegmentedPage().isCapableForSegSize(bounds[0])) {
-            sortIntoBucket(targetPage, (short) -1);
-          }
-        }
-      }
-      return null;
-    }
-  }
-
   @Override
   public void setMetric(SchemaRegionCachedMetric metric) {
     this.metric = metric;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/pagemgr/PagePool.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/pagemgr/PagePool.java
new file mode 100644
index 00000000000..7cd9d4d5860
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/pagemgr/PagePool.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package 
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.schemafile.pagemgr;
+
+import 
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.schemafile.ISchemaPage;
+import 
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.schemafile.SchemaFileConfig;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+public class PagePool {
+  private static final Logger logger = LoggerFactory.getLogger(PagePool.class);
+  private final Map<Integer, ISchemaPage> pageInstCache;
+  private final Lock cacheLock;
+  private final Condition cacheFull;
+
+  private final PageIndexSortBuckets pageIndexBuckets;
+
+  PagePool() {
+    this.pageInstCache =
+        Collections.synchronizedMap(new 
LinkedHashMap<>(SchemaFileConfig.PAGE_CACHE_SIZE, 1, true));
+    this.pageIndexBuckets = new 
PageIndexSortBuckets(SchemaFileConfig.SEG_SIZE_LST, pageInstCache);
+
+    this.cacheLock = new ReentrantLock();
+    this.cacheFull = this.cacheLock.newCondition();
+  }
+
+  /**
+   * A rough cache size guardian, all threads passed this entrant check will 
not be limited with
+   * cache size anymore. TODO A better guardian is based on constraint per 
thread.
+   */
+  public void cacheGuardian() {
+    cacheLock.lock();
+    try {
+      while (pageInstCache.size() > SchemaFileConfig.PAGE_CACHE_SIZE) {
+        try {
+          // try to evict by LRU
+          Iterator<ISchemaPage> iterator = pageInstCache.values().iterator();
+          int pageSizeLimit = SchemaFileConfig.PAGE_CACHE_SIZE, size = 
pageInstCache.size();
+
+          ISchemaPage p;
+          while (iterator.hasNext()) {
+            p = iterator.next();
+
+            if (size <= pageSizeLimit) {
+              break;
+            }
+
+            if (p.getRefCnt().get() == 0) {
+              iterator.remove();
+              size--;
+            }
+          }
+
+          if (pageInstCache.size() > SchemaFileConfig.PAGE_CACHE_SIZE) {
+            // wait until another operation finished and released pages
+            cacheFull.await();
+          }
+        } catch (InterruptedException e) {
+          logger.warn(
+              "Interrupted during page cache eviction. Consider increasing 
cache size, "
+                  + "reducing concurrency, or extending timeout");
+        }
+      }
+    } finally {
+      cacheLock.unlock();
+    }
+  }
+
+  public void put(ISchemaPage page) {
+    pageInstCache.put(page.getPageIndex(), page);
+  }
+
+  public void lock() {
+    cacheLock.lock();
+  }
+
+  public void unlock() {
+    cacheLock.unlock();
+  }
+
+  public ISchemaPage get(int index) {
+    return pageInstCache.get(index);
+  }
+
+  public ISchemaPage getNearestFitPage(short expectedSize) {
+    return pageIndexBuckets.getNearestFitPage(expectedSize, true);
+  }
+
+  public void remove(int index) {
+    pageInstCache.remove(index);
+  }
+
+  public void clear() {
+    pageInstCache.clear();
+  }
+
+  public void appendBucketIndex(SchemaPageContext cxt) {
+    cxt.appendBucketIndex(pageIndexBuckets);
+  }
+
+  /** release referents and evict likely useless page if necessary */
+  protected void releaseReferent(SchemaPageContext cxt) {
+    for (ISchemaPage p : cxt.referredPages.values()) {
+      p.decrementAndGetRefCnt();
+    }
+
+    if (pageInstCache.size() > SchemaFileConfig.PAGE_CACHE_SIZE) {
+      cacheLock.lock();
+      try {
+        for (ISchemaPage p : cxt.referredPages.values()) {
+          // unnecessary to evict the page object in context by 2 case:
+          //  1. it is held by another thread, e.g., RefCnt != 0
+          //  2. it had already been evicted, e.g., pageCache.get(id) != page
+          if (p.getRefCnt().get() == 0 && pageInstCache.get(p.getPageIndex()) 
== p) {
+            pageInstCache.remove(p.getPageIndex());
+          }
+        }
+
+        if (pageInstCache.size() <= SchemaFileConfig.PAGE_CACHE_SIZE) {
+          cacheFull.signal();
+        }
+      } finally {
+        cacheLock.unlock();
+      }
+    }
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/pagemgr/SchemaPageContext.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/pagemgr/SchemaPageContext.java
new file mode 100644
index 00000000000..d494be5a097
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/pagemgr/SchemaPageContext.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package 
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.schemafile.pagemgr;
+
+import 
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.schemafile.ISchemaPage;
+import 
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.schemafile.ISegmentedPage;
+import 
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.schemafile.SchemaFileConfig;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+/** Thread local variables about write/update process. */
+class SchemaPageContext {
+  final long threadID;
+  final PageIndexSortBuckets indexBuckets;
+  // locked and dirty pages are all referred pages, they all reside in page 
cache
+  final Map<Integer, ISchemaPage> referredPages;
+  final Set<Integer> lockTraces;
+  // track B+Tree traversal trace
+  final int[] treeTrace;
+  int dirtyCnt;
+  int interleavedFlushCnt;
+
+  // flush B+Tree leaf before operation finished since all records are ordered
+  ISegmentedPage lastLeafPage;
+
+  public SchemaPageContext() {
+    threadID = Thread.currentThread().getId();
+    referredPages = new HashMap<>();
+    indexBuckets = new PageIndexSortBuckets(SchemaFileConfig.SEG_SIZE_LST, 
referredPages);
+    treeTrace = new int[16];
+    lockTraces = new HashSet<>();
+    lastLeafPage = null;
+    dirtyCnt = 0;
+    interleavedFlushCnt = 0;
+  }
+
+  public void markDirty(ISchemaPage page) {
+    markDirty(page, false);
+  }
+
+  public void markDirty(ISchemaPage page, boolean forceReplace) {
+    if (!page.isDirtyPage()) {
+      dirtyCnt++;
+    }
+    page.setDirtyFlag();
+    refer(page);
+    if (forceReplace && referredPages.containsKey(page.getPageIndex())) {
+
+      // previous page is dirty, so it's not a new dirty page
+      if (referredPages.get(page.getPageIndex()).isDirtyPage()) {
+        dirtyCnt--;
+      }
+      // force to replace
+      referredPages.put(page.getPageIndex(), page);
+    }
+  }
+
+  public void traceLock(ISchemaPage page) {
+    refer(page);
+    lockTraces.add(page.getPageIndex());
+  }
+
+  // referred pages will not be evicted until operation finished
+  public void refer(ISchemaPage page) {
+    if (!referredPages.containsKey(page.getPageIndex())) {
+      page.incrementAndGetRefCnt();
+      referredPages.put(page.getPageIndex(), page);
+    }
+  }
+
+  /**
+   * Since records are ordered for write operation, it is reasonable to flush 
those left siblings of
+   * the active leaf page. The target page would be initiated at the first 
split within the
+   * operation.
+   *
+   * @param page left leaf of the split
+   */
+  public void invokeLastLeaf(ISchemaPage page) {
+    // only record at the first split
+    if (lastLeafPage == null) {
+      lastLeafPage = page.getAsSegmentedPage();
+    }
+  }
+
+  public void appendBucketIndex(PageIndexSortBuckets pisb) {
+    for (int i = 0; i < SchemaFileConfig.SEG_SIZE_LST.length; i++) {
+      pisb.getBucket(i).addAll(indexBuckets.getBucket(i));
+    }
+  }
+}

Reply via email to