This is an automated email from the ASF dual-hosted git repository.
zyk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 070de203710 PBTree: Decouple PageManager (#11953)
070de203710 is described below
commit 070de203710edb0131bfa32208255c4c72df64a7
Author: ZhaoXin <[email protected]>
AuthorDate: Tue Jan 23 15:57:08 2024 +0800
PBTree: Decouple PageManager (#11953)
---
.../schemafile/pagemgr/BTreePageManager.java | 8 +-
.../pbtree/schemafile/pagemgr/PageIOChannel.java | 184 +++++++++
.../schemafile/pagemgr/PageIndexSortBuckets.java | 129 ++++++
.../pbtree/schemafile/pagemgr/PageManager.java | 452 ++-------------------
.../impl/pbtree/schemafile/pagemgr/PagePool.java | 152 +++++++
.../schemafile/pagemgr/SchemaPageContext.java | 109 +++++
6 files changed, 605 insertions(+), 429 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/pagemgr/BTreePageManager.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/pagemgr/BTreePageManager.java
index ddb47c97361..d6b87431d3b 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/pagemgr/BTreePageManager.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/pagemgr/BTreePageManager.java
@@ -314,7 +314,7 @@ public class BTreePageManager extends PageManager {
@Override
public void delete(ICachedMNode node) throws IOException, MetadataException {
- cacheGuardian();
+ pagePool.cacheGuardian();
SchemaPageContext cxt = new SchemaPageContext();
// node is the record deleted from its segment
entrantLock(node.getParent(), cxt);
@@ -382,7 +382,7 @@ public class BTreePageManager extends PageManager {
flushDirtyPages(cxt);
} finally {
releaseLocks(cxt);
- releaseReferent(cxt);
+ pagePool.releaseReferent(cxt);
}
}
@@ -481,7 +481,7 @@ public class BTreePageManager extends PageManager {
return child;
} finally {
initPage.getLock().readLock().unlock();
- releaseReferent(cxt);
+ pagePool.releaseReferent(cxt);
threadContexts.remove(Thread.currentThread().getId(), cxt);
}
}
@@ -566,7 +566,7 @@ public class BTreePageManager extends PageManager {
} finally {
// safety of iterator should be guaranteed by upper layer
pageHeldLock.getLock().readLock().unlock();
- releaseReferent(cxt);
+ pagePool.releaseReferent(cxt);
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/pagemgr/PageIOChannel.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/pagemgr/PageIOChannel.java
new file mode 100644
index 00000000000..87a05cdca42
--- /dev/null
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/pagemgr/PageIOChannel.java
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.schemafile.pagemgr;
+
+import org.apache.iotdb.commons.exception.MetadataException;
+import
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.schemafile.ISchemaPage;
+import
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.schemafile.SchemaFileConfig;
+import
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.schemafile.log.SchemaFileLogReader;
+import
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.schemafile.log.SchemaFileLogWriter;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.nio.file.StandardOpenOption;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
+
+import static
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.schemafile.SchemaFile.getPageAddress;
+
+public class PageIOChannel {
+ private final FileChannel channel;
+ private final File pmtFile;
+ private FileChannel readChannel;
+ private final AtomicInteger logCounter;
+ private SchemaFileLogWriter logWriter;
+
+ // flush strategy is dependent on consensus protocol, only check protocol on
init
+ protected FlushPageStrategy flushDirtyPagesStrategy;
+ protected SinglePageFlushStrategy singlePageFlushStrategy;
+
+ PageIOChannel(FileChannel channel, File pmtFile, boolean flushWithLogging,
String logPath)
+ throws IOException, MetadataException {
+ this.channel = channel;
+ this.pmtFile = pmtFile;
+ this.readChannel = FileChannel.open(pmtFile.toPath(),
StandardOpenOption.READ);
+ if (flushWithLogging) {
+ // without RATIS, utilize physical logging for integrity
+ int pageAcc = (int) recoverFromLog(logPath) /
SchemaFileConfig.PAGE_LENGTH;
+ this.logWriter = new SchemaFileLogWriter(logPath);
+ logCounter = new AtomicInteger(pageAcc);
+ flushDirtyPagesStrategy = this::flushDirtyPagesWithLogging;
+ singlePageFlushStrategy = this::flushSinglePageWithLogging;
+ } else {
+ // with RATIS enabled, integrity is guaranteed by consensus protocol
+ logCounter = new AtomicInteger();
+ logWriter = null;
+ flushDirtyPagesStrategy = this::flushDirtyPagesWithoutLogging;
+ singlePageFlushStrategy = this::flushSinglePageWithoutLogging;
+ }
+ }
+
+ public void renewLogWriter() throws IOException {
+ if (logWriter != null) {
+ logWriter.renew();
+ }
+ }
+
+ public void closeLogWriter() throws IOException {
+ if (logWriter != null) {
+ logWriter.close();
+ }
+ }
+
+ /** Load bytes from log, deserialize and flush directly into channel, return
current length */
+ private long recoverFromLog(String logPath) throws IOException,
MetadataException {
+ SchemaFileLogReader reader = new SchemaFileLogReader(logPath);
+ ISchemaPage page;
+ List<byte[]> res = reader.collectUpdatedEntries();
+ for (byte[] entry : res) {
+ // TODO check bytes semantic correctness with CRC32 or other way
+ page = ISchemaPage.loadSchemaPage(ByteBuffer.wrap(entry));
+ page.flushPageToChannel(this.channel);
+ }
+ reader.close();
+
+ // complete log file
+ if (!res.isEmpty()) {
+ try (FileOutputStream outputStream = new FileOutputStream(logPath,
true)) {
+ outputStream.write(new byte[] {SchemaFileConfig.SF_COMMIT_MARK});
+ return outputStream.getChannel().size();
+ }
+ }
+ return 0L;
+ }
+
+ public void loadFromFileToBuffer(ByteBuffer dst, int pageIndex) throws
IOException {
+ dst.clear();
+ if (!readChannel.isOpen()) {
+ readChannel = FileChannel.open(pmtFile.toPath(),
StandardOpenOption.READ);
+ }
+ readChannel.read(dst, getPageAddress(pageIndex));
+ }
+
+ // region Flush Strategy
+ @FunctionalInterface
+ interface FlushPageStrategy {
+ void apply(List<ISchemaPage> dirtyPages) throws IOException;
+ }
+
+ @FunctionalInterface
+ interface SinglePageFlushStrategy {
+ void apply(ISchemaPage page) throws IOException;
+ }
+
+ private void flushDirtyPagesWithLogging(List<ISchemaPage> dirtyPages) throws
IOException {
+ if (dirtyPages.size() == 0) {
+ return;
+ }
+
+ if (logCounter.get() > SchemaFileConfig.SCHEMA_FILE_LOG_SIZE) {
+ logWriter = logWriter.renew();
+ logCounter.set(0);
+ }
+
+ logCounter.addAndGet(dirtyPages.size());
+ for (ISchemaPage page : dirtyPages) {
+ page.syncPageBuffer();
+ logWriter.write(page);
+ }
+ logWriter.prepare();
+
+ for (ISchemaPage page : dirtyPages) {
+ page.flushPageToChannel(channel);
+ }
+ logWriter.commit();
+ }
+
+ private void flushSinglePageWithLogging(ISchemaPage page) throws IOException
{
+ if (logCounter.get() > SchemaFileConfig.SCHEMA_FILE_LOG_SIZE) {
+ logWriter = logWriter.renew();
+ logCounter.set(0);
+ }
+
+ logCounter.addAndGet(1);
+ page.syncPageBuffer();
+ logWriter.write(page);
+ logWriter.prepare();
+ page.flushPageToChannel(channel);
+ logWriter.commit();
+ }
+
+ private void flushDirtyPagesWithoutLogging(List<ISchemaPage> dirtyPages)
throws IOException {
+ for (ISchemaPage page : dirtyPages) {
+ page.syncPageBuffer();
+ page.flushPageToChannel(channel);
+ }
+ }
+
+ private void flushSinglePageWithoutLogging(ISchemaPage page) throws
IOException {
+ page.syncPageBuffer();
+ page.flushPageToChannel(channel);
+ }
+
+ public synchronized void flushMultiPages(SchemaPageContext cxt) throws
IOException {
+ flushDirtyPagesStrategy.apply(
+ cxt.referredPages.values().stream()
+ .filter(ISchemaPage::isDirtyPage)
+ .collect(Collectors.toList()));
+ }
+
+ public void flushSinglePage(ISchemaPage page) throws IOException {
+ singlePageFlushStrategy.apply(page);
+ }
+ // endregion
+}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/pagemgr/PageIndexSortBuckets.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/pagemgr/PageIndexSortBuckets.java
new file mode 100644
index 00000000000..a0cf716dcc3
--- /dev/null
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/pagemgr/PageIndexSortBuckets.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.schemafile.pagemgr;
+
+import
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.schemafile.ISchemaPage;
+import
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.schemafile.SchemaFileConfig;
+
+import java.util.ArrayDeque;
+import java.util.Arrays;
+import java.util.Map;
+
+/**
+ * * Index buckets affiliated to a page collection. Indexes are sort into
buckets according to spare
+ * space of its corresponding page instance.
+ */
+class PageIndexSortBuckets {
+ private final short[] bounds;
+ private final ArrayDeque[] buckets;
+ private final Map<Integer, ISchemaPage> pageContainer;
+
+ public PageIndexSortBuckets(short[] borders, Map<Integer, ISchemaPage>
container) {
+ bounds = Arrays.copyOf(borders, borders.length);
+ buckets = new ArrayDeque[borders.length];
+ pageContainer = container;
+ for (int i = 0; i < borders.length; i++) {
+ buckets[i] = new ArrayDeque();
+ }
+ }
+
+ public void clear() {
+ for (ArrayDeque q : buckets) {
+ q.clear();
+ }
+ }
+
+ public void sortIntoBucket(ISchemaPage page, short newSegSize) {
+ if (page.getAsSegmentedPage() == null) {
+ return;
+ }
+
+ // actual space occupied by a segment includes both its own length and the
length of its
+ // offset. so available length for a segment is the spareSize minus the
offset bytes
+ short availableSize =
+ newSegSize < 0
+ ? (short) (page.getAsSegmentedPage().getSpareSize() -
SchemaFileConfig.SEG_OFF_DIG)
+ : (short)
+ (page.getAsSegmentedPage().getSpareSize()
+ - newSegSize
+ - SchemaFileConfig.SEG_OFF_DIG);
+
+ // too small to index
+ if (availableSize <= SchemaFileConfig.SEG_HEADER_SIZE) {
+ return;
+ }
+
+ // be like: SEG_HEADER < buckets[0] <= bounds[0] < buckets[1] <= ...
+ for (int i = 0; i < bounds.length; i++) {
+ // the last of SEG_SIZE_LST is the maximum page size, definitely larger
than others
+ if (availableSize <= bounds[i]) {
+ buckets[i].add(page.getPageIndex());
+ return;
+ }
+ }
+ }
+
+ public ArrayDeque<Integer> getBucket(int index) {
+ return buckets[index];
+ }
+
+ /**
+ * @param withLock set if page container is a global/shared object
+ * @return the page index will be removed from the bucket.
+ */
+ public synchronized ISchemaPage getNearestFitPage(short size, boolean
withLock) {
+ ISchemaPage targetPage;
+ int elemToCheck;
+ for (int i = 0; i < buckets.length && pageContainer.size() > 0; i++) {
+ // buckets[i] stores pages with spare space less than bounds[i]
+ elemToCheck = buckets[i].size();
+ while (size < bounds[i] && elemToCheck > 0) {
+ // find roughly fit page
+ targetPage = pageContainer.getOrDefault(buckets[i].poll(), null);
+ elemToCheck--;
+
+ if (targetPage == null || targetPage.getAsSegmentedPage() == null) {
+ // act as lazy remove on buckets
+ continue;
+ }
+
+ // seek in global container thus other page could be read locked
+ if (withLock
+ && targetPage.getAsSegmentedPage().isCapableForSegSize(size)
+ && targetPage.getLock().writeLock().tryLock()) {
+ if (targetPage.getAsSegmentedPage().isCapableForSegSize(size)) {
+ return targetPage.getAsSegmentedPage();
+ }
+ targetPage.getLock().writeLock().unlock();
+ }
+
+ // only in local dirty pages which are always write locked by itself
+ if (!withLock &&
targetPage.getAsSegmentedPage().isCapableForSegSize(size)) {
+ return targetPage;
+ }
+
+ // not large as expected, fit into suitable bucket
+ if (i > 0 &&
targetPage.getAsSegmentedPage().isCapableForSegSize(bounds[0])) {
+ sortIntoBucket(targetPage, (short) -1);
+ }
+ }
+ }
+ return null;
+ }
+}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/pagemgr/PageManager.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/pagemgr/PageManager.java
index acd41d192af..ad92a332d57 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/pagemgr/PageManager.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/pagemgr/PageManager.java
@@ -32,35 +32,18 @@ import
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.schemafil
import
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.schemafile.SchemaFile;
import
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.schemafile.SchemaFileConfig;
import
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.schemafile.SegmentedPage;
-import
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.schemafile.log.SchemaFileLogReader;
-import
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.schemafile.log.SchemaFileLogWriter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
-import java.io.FileOutputStream;
import java.io.IOException;
import java.io.PrintWriter;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
-import java.nio.file.StandardOpenOption;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.LinkedHashMap;
-import java.util.LinkedList;
-import java.util.List;
import java.util.Map;
-import java.util.Queue;
-import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.locks.Condition;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.stream.Collectors;
@@ -76,63 +59,29 @@ import static
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.sc
public abstract class PageManager implements IPageManager {
protected static final Logger logger =
LoggerFactory.getLogger(PageManager.class);
- protected final Map<Integer, ISchemaPage> pageInstCache;
- // bucket for quick retrieval, only append when write operation finished
- protected final PageIndexSortBuckets pageIndexBuckets;
-
- protected final Lock cacheLock;
- protected final Condition cacheFull;
+ protected final PagePool pagePool;
+ protected final PageIOChannel pageIOChannel;
protected final Map<Long, SchemaPageContext> threadContexts;
protected final AtomicInteger lastPageIndex;
- private final FileChannel channel;
-
- // handle timeout interruption during reading
- private final File pmtFile;
- private FileChannel readChannel;
-
- private final AtomicInteger logCounter;
- private SchemaFileLogWriter logWriter;
private SchemaRegionCachedMetric metric = null;
- // flush strategy is dependent on consensus protocol, only check protocol on
init
- protected FlushPageStrategy flushDirtyPagesStrategy;
- protected SinglePageFlushStrategy singlePageFlushStrategy;
-
PageManager(FileChannel channel, File pmtFile, int lastPageIndex, String
logPath)
throws IOException, MetadataException {
- this.pageInstCache =
- Collections.synchronizedMap(new
LinkedHashMap<>(SchemaFileConfig.PAGE_CACHE_SIZE, 1, true));
- this.pageIndexBuckets = new
PageIndexSortBuckets(SchemaFileConfig.SEG_SIZE_LST, pageInstCache);
+ this.pagePool = new PagePool();
this.threadContexts = new ConcurrentHashMap<>();
-
- this.cacheLock = new ReentrantLock();
- this.cacheFull = this.cacheLock.newCondition();
-
this.lastPageIndex =
lastPageIndex >= 0 ? new AtomicInteger(lastPageIndex) : new
AtomicInteger(0);
- this.channel = channel;
- this.pmtFile = pmtFile;
- this.readChannel = FileChannel.open(pmtFile.toPath(),
StandardOpenOption.READ);
if (IoTDBDescriptor.getInstance()
.getConfig()
.getSchemaRegionConsensusProtocolClass()
.equals(ConsensusFactory.RATIS_CONSENSUS)) {
- // with RATIS enabled, integrity is guaranteed by consensus protocol
- logCounter = new AtomicInteger();
- logWriter = null;
- flushDirtyPagesStrategy = this::flushDirtyPagesWithoutLogging;
- singlePageFlushStrategy = this::flushSinglePageWithoutLogging;
+ pageIOChannel = new PageIOChannel(channel, pmtFile, false, logPath);
} else {
- // without RATIS, utilize physical logging for integrity
- int pageAcc = (int) recoverFromLog(logPath) /
SchemaFileConfig.PAGE_LENGTH;
- this.logWriter = new SchemaFileLogWriter(logPath);
- logCounter = new AtomicInteger(pageAcc);
- flushDirtyPagesStrategy = this::flushDirtyPagesWithLogging;
- singlePageFlushStrategy = this::flushSinglePageWithLogging;
+ pageIOChannel = new PageIOChannel(channel, pmtFile, true, logPath);
}
// construct first page if file to init
@@ -140,81 +89,17 @@ public abstract class PageManager implements IPageManager {
ISegmentedPage rootPage =
ISchemaPage.initSegmentedPage(ByteBuffer.allocate(SchemaFileConfig.PAGE_LENGTH),
0);
rootPage.allocNewSegment(SchemaFileConfig.SEG_MAX_SIZ);
- pageInstCache.put(rootPage.getPageIndex(), rootPage);
+ pagePool.put(rootPage);
rootPage.syncPageBuffer();
rootPage.flushPageToChannel(channel);
}
}
- /** Load bytes from log, deserialize and flush directly into channel, return
current length */
- private long recoverFromLog(String logPath) throws IOException,
MetadataException {
- SchemaFileLogReader reader = new SchemaFileLogReader(logPath);
- ISchemaPage page;
- List<byte[]> res = reader.collectUpdatedEntries();
- for (byte[] entry : res) {
- // TODO check bytes semantic correctness with CRC32 or other way
- page = ISchemaPage.loadSchemaPage(ByteBuffer.wrap(entry));
- page.flushPageToChannel(this.channel);
- }
- reader.close();
-
- // complete log file
- if (!res.isEmpty()) {
- try (FileOutputStream outputStream = new FileOutputStream(logPath,
true)) {
- outputStream.write(new byte[] {SchemaFileConfig.SF_COMMIT_MARK});
- return outputStream.getChannel().size();
- }
- }
- return 0L;
- }
-
- /**
- * A rough cache size guardian, all threads passed this entrant check will
not be limited with
- * cache size anymore. TODO A better guardian is based on constraint per
thread.
- */
- protected void cacheGuardian() {
- cacheLock.lock();
- try {
- while (pageInstCache.size() > SchemaFileConfig.PAGE_CACHE_SIZE) {
- try {
- // try to evict by LRU
- Iterator<ISchemaPage> iterator = pageInstCache.values().iterator();
- int pageSizeLimit = SchemaFileConfig.PAGE_CACHE_SIZE, size =
pageInstCache.size();
-
- ISchemaPage p;
- while (iterator.hasNext()) {
- p = iterator.next();
-
- if (size <= pageSizeLimit) {
- break;
- }
-
- if (p.getRefCnt().get() == 0) {
- iterator.remove();
- size--;
- }
- }
-
- if (pageInstCache.size() > SchemaFileConfig.PAGE_CACHE_SIZE) {
- // wait until another operation finished and released pages
- cacheFull.await();
- }
- } catch (InterruptedException e) {
- logger.warn(
- "Interrupted during page cache eviction. Consider increasing
cache size, "
- + "reducing concurrency, or extending timeout");
- }
- }
- } finally {
- cacheLock.unlock();
- }
- }
-
@Override
public void writeMNode(ICachedMNode node) throws MetadataException,
IOException {
SchemaPageContext cxt = new SchemaPageContext();
threadContexts.put(Thread.currentThread().getId(), cxt);
- cacheGuardian();
+ pagePool.cacheGuardian();
entrantLock(node, cxt);
try {
writeNewChildren(node, cxt);
@@ -222,7 +107,7 @@ public abstract class PageManager implements IPageManager {
flushDirtyPages(cxt);
} finally {
releaseLocks(cxt);
- releaseReferent(cxt);
+ pagePool.releaseReferent(cxt);
threadContexts.remove(Thread.currentThread().getId(), cxt);
}
}
@@ -234,33 +119,6 @@ public abstract class PageManager implements IPageManager {
}
}
- /** release referents and evict likely useless page if necessary */
- protected void releaseReferent(SchemaPageContext cxt) {
- for (ISchemaPage p : cxt.referredPages.values()) {
- p.decrementAndGetRefCnt();
- }
-
- if (pageInstCache.size() > SchemaFileConfig.PAGE_CACHE_SIZE) {
- cacheLock.lock();
- try {
- for (ISchemaPage p : cxt.referredPages.values()) {
- // unnecessary to evict the page object in context by 2 case:
- // 1. it is held by another thread, e.g., RefCnt != 0
- // 2. it had already been evicted, e.g., pageCache.get(id) != page
- if (p.getRefCnt().get() == 0 && pageInstCache.get(p.getPageIndex())
== p) {
- pageInstCache.remove(p.getPageIndex());
- }
- }
-
- if (pageInstCache.size() <= SchemaFileConfig.PAGE_CACHE_SIZE) {
- cacheFull.signal();
- }
- } finally {
- cacheLock.unlock();
- }
- }
- }
-
/** locking in the order of page index to avoid deadlock. */
protected void entrantLock(ICachedMNode node, SchemaPageContext cxt)
throws IOException, MetadataException {
@@ -567,74 +425,13 @@ public abstract class PageManager implements IPageManager
{
// endregion
// region Flush Strategy
- @FunctionalInterface
- interface FlushPageStrategy {
- void apply(List<ISchemaPage> dirtyPages) throws IOException;
- }
-
- @FunctionalInterface
- interface SinglePageFlushStrategy {
- void apply(ISchemaPage page) throws IOException;
- }
-
- private void flushDirtyPagesWithLogging(List<ISchemaPage> dirtyPages) throws
IOException {
- if (dirtyPages.size() == 0) {
- return;
- }
-
- if (logCounter.get() > SchemaFileConfig.SCHEMA_FILE_LOG_SIZE) {
- logWriter = logWriter.renew();
- logCounter.set(0);
- }
-
- logCounter.addAndGet(dirtyPages.size());
- for (ISchemaPage page : dirtyPages) {
- page.syncPageBuffer();
- logWriter.write(page);
- }
- logWriter.prepare();
-
- for (ISchemaPage page : dirtyPages) {
- page.flushPageToChannel(channel);
- }
- logWriter.commit();
- }
-
- private void flushSinglePageWithLogging(ISchemaPage page) throws IOException
{
- if (logCounter.get() > SchemaFileConfig.SCHEMA_FILE_LOG_SIZE) {
- logWriter = logWriter.renew();
- logCounter.set(0);
- }
-
- logCounter.addAndGet(1);
- page.syncPageBuffer();
- logWriter.write(page);
- logWriter.prepare();
- page.flushPageToChannel(channel);
- logWriter.commit();
- }
-
- private void flushDirtyPagesWithoutLogging(List<ISchemaPage> dirtyPages)
throws IOException {
- for (ISchemaPage page : dirtyPages) {
- page.syncPageBuffer();
- page.flushPageToChannel(channel);
- }
- }
-
- private void flushSinglePageWithoutLogging(ISchemaPage page) throws
IOException {
- page.syncPageBuffer();
- page.flushPageToChannel(channel);
- }
public synchronized void flushDirtyPages(SchemaPageContext cxt) throws
IOException {
if (cxt.dirtyCnt == 0) {
return;
}
- flushDirtyPagesStrategy.apply(
- cxt.referredPages.values().stream()
- .filter(ISchemaPage::isDirtyPage)
- .collect(Collectors.toList()));
- cxt.appendBucketIndex(pageIndexBuckets);
+ pageIOChannel.flushMultiPages(cxt);
+ pagePool.appendBucketIndex(cxt);
if (metric != null) {
metric.recordFlushPageNum(cxt.referredPages.size());
}
@@ -645,6 +442,7 @@ public abstract class PageManager implements IPageManager {
* the lastLeaf, unlock, deref, and remove it from dirtyPages. lastLeafPage
only initiated at
* overflowOperation
*/
+ @Deprecated
private void interleavedFlush(ISchemaPage page, SchemaPageContext cxt)
throws IOException {
if (cxt.lastLeafPage == null || cxt.lastLeafPage.getPageIndex() ==
page.getPageIndex()) {
return;
@@ -653,7 +451,7 @@ public abstract class PageManager implements IPageManager {
if (metric != null) {
metric.recordFlushPageNum(1);
}
- singlePageFlushStrategy.apply(cxt.lastLeafPage);
+ pageIOChannel.flushSinglePage(cxt.lastLeafPage);
// this lastLeaf shall only be lock once
cxt.dirtyCnt--;
@@ -667,7 +465,7 @@ public abstract class PageManager implements IPageManager {
// can be reclaimed since the page only referred by pageInstCache
cxt.referredPages.remove(cxt.lastLeafPage.getPageIndex());
// alleviate eviction pressure
- pageInstCache.remove(cxt.lastLeafPage.getPageIndex());
+ pagePool.remove(cxt.lastLeafPage.getPageIndex());
cxt.lastLeafPage = page.getAsSegmentedPage();
}
@@ -681,9 +479,9 @@ public abstract class PageManager implements IPageManager {
@Override
public void clear() throws IOException, MetadataException {
- pageInstCache.clear();
+ pagePool.clear();
lastPageIndex.set(0);
- logWriter = logWriter == null ? null : logWriter.renew();
+ pageIOChannel.renewLogWriter();
}
@Override
@@ -700,9 +498,7 @@ public abstract class PageManager implements IPageManager {
@Override
public void close() throws IOException {
- if (logWriter != null) {
- logWriter.close();
- }
+ pageIOChannel.closeLogWriter();
}
// endregion
@@ -723,9 +519,9 @@ public abstract class PageManager implements IPageManager {
// lock for no duplicate page with same index from disk, and guarantees
page will not be evicted
// by other thread before referred by current thread
- cacheLock.lock();
+ pagePool.lock();
try {
- ISchemaPage page = pageInstCache.get(pageIdx);
+ ISchemaPage page = pagePool.get(pageIdx);
if (page != null) {
cxt.refer(page);
return page;
@@ -735,13 +531,13 @@ public abstract class PageManager implements IPageManager
{
if (metric != null) {
metric.recordLoadPageNum(1);
}
- loadFromFile(newBuf, pageIdx);
+ pageIOChannel.loadFromFileToBuffer(newBuf, pageIdx);
page = ISchemaPage.loadSchemaPage(newBuf);
cxt.refer(page);
- addPageToCache(page);
+ pagePool.put(page);
return page;
} finally {
- cacheLock.unlock();
+ pagePool.unlock();
}
}
@@ -754,7 +550,7 @@ public abstract class PageManager implements IPageManager {
protected ISchemaPage replacePageInCache(ISchemaPage page, SchemaPageContext
cxt) {
// no need to lock since the root of B+Tree is locked
cxt.markDirty(page, true);
- addPageToCache(page);
+ pagePool.put(page);
return page;
}
@@ -773,10 +569,10 @@ public abstract class PageManager implements IPageManager
{
return targetPage.getAsSegmentedPage();
}
- cacheLock.lock();
+ pagePool.lock();
try {
// pageIndexBuckets sorts pages within pageInstCache into buckets to
expedite access
- targetPage = pageIndexBuckets.getNearestFitPage(size, true);
+ targetPage = pagePool.getNearestFitPage(size);
if (targetPage != null) {
cxt.markDirty(targetPage);
cxt.traceLock(targetPage);
@@ -792,7 +588,7 @@ public abstract class PageManager implements IPageManager {
cxt.indexBuckets.sortIntoBucket(targetPage, size);
return targetPage.getAsSegmentedPage();
} finally {
- cacheLock.unlock();
+ pagePool.unlock();
}
}
@@ -804,7 +600,7 @@ public abstract class PageManager implements IPageManager {
cxt.markDirty(newPage);
newPage.getLock().writeLock().lock();
cxt.traceLock(newPage);
- addPageToCache(newPage);
+ pagePool.put(newPage);
return newPage;
}
@@ -812,25 +608,11 @@ public abstract class PageManager implements IPageManager
{
// new page will be LOCAL until the creating thread finished
// thus not added to sparsePageIndex now
page.setPageIndex(lastPageIndex.incrementAndGet());
- addPageToCache(page);
+ pagePool.put(page);
cxt.markDirty(page);
return page;
}
- protected ISchemaPage addPageToCache(ISchemaPage page) {
- // size control is left to operation entrance
- // return value could use to assess whether key already existed
- return this.pageInstCache.put(page.getPageIndex(), page);
- }
-
- private int loadFromFile(ByteBuffer dst, int pageIndex) throws IOException {
- dst.clear();
- if (!readChannel.isOpen()) {
- readChannel = FileChannel.open(pmtFile.toPath(),
StandardOpenOption.READ);
- }
- return readChannel.read(dst, getPageAddress(pageIndex));
- }
-
private void updateParentalRecord(
ICachedMNode parent, String key, long newSegAddr, SchemaPageContext cxt)
throws IOException, MetadataException {
@@ -978,186 +760,6 @@ public abstract class PageManager implements IPageManager
{
// endregion
- /** Thread local variables about write/update process. */
- protected static class SchemaPageContext {
- final long threadID;
- final PageIndexSortBuckets indexBuckets;
- // locked and dirty pages are all referred pages, they all reside in page
cache
- final Map<Integer, ISchemaPage> referredPages;
- final Set<Integer> lockTraces;
- // track B+Tree traversal trace
- final int[] treeTrace;
- int dirtyCnt;
- int interleavedFlushCnt;
-
- // flush B+Tree leaf before operation finished since all records are
ordered
- ISegmentedPage lastLeafPage;
-
- public SchemaPageContext() {
- threadID = Thread.currentThread().getId();
- referredPages = new HashMap<>();
- indexBuckets = new PageIndexSortBuckets(SchemaFileConfig.SEG_SIZE_LST,
referredPages);
- treeTrace = new int[16];
- lockTraces = new HashSet<>();
- lastLeafPage = null;
- dirtyCnt = 0;
- interleavedFlushCnt = 0;
- }
-
- protected void markDirty(ISchemaPage page) {
- markDirty(page, false);
- }
-
- protected void markDirty(ISchemaPage page, boolean forceReplace) {
- if (!page.isDirtyPage()) {
- dirtyCnt++;
- }
- page.setDirtyFlag();
- refer(page);
- if (forceReplace && referredPages.containsKey(page.getPageIndex())) {
-
- // previous page is dirty, so it's not a new dirty page
- if (referredPages.get(page.getPageIndex()).isDirtyPage()) {
- dirtyCnt--;
- }
- // force to replace
- referredPages.put(page.getPageIndex(), page);
- }
- }
-
- protected void traceLock(ISchemaPage page) {
- refer(page);
- lockTraces.add(page.getPageIndex());
- }
-
- // referred pages will not be evicted until operation finished
- private void refer(ISchemaPage page) {
- if (!referredPages.containsKey(page.getPageIndex())) {
- page.incrementAndGetRefCnt();
- referredPages.put(page.getPageIndex(), page);
- }
- }
-
- /**
- * Since records are ordered for write operation, it is reasonable to
flush those left siblings
- * of the active leaf page. The target page would be initiated at the
first split within the
- * operation.
- *
- * @param page left leaf of the split
- */
- public void invokeLastLeaf(ISchemaPage page) {
- // only record at the first split
- if (lastLeafPage == null) {
- lastLeafPage = page.getAsSegmentedPage();
- }
- }
-
- private void appendBucketIndex(PageIndexSortBuckets pisb) {
- for (int i = 0; i < indexBuckets.buckets.length; i++) {
- pisb.buckets[i].addAll(indexBuckets.buckets[i]);
- }
- }
- }
-
- /**
- * * Index buckets affiliated to a page collection. Indexes are sort into
buckets according to
- * spare space of its corresponding page instance.
- */
- protected static class PageIndexSortBuckets {
- private final short[] bounds;
- private final LinkedList<Integer>[] buckets;
- private final Map<Integer, ISchemaPage> pageContainer;
-
- public PageIndexSortBuckets(short[] borders, Map<Integer, ISchemaPage>
container) {
- bounds = Arrays.copyOf(borders, borders.length);
- buckets = (LinkedList<Integer>[]) new LinkedList[borders.length];
- pageContainer = container;
- for (int i = 0; i < borders.length; i++) {
- buckets[i] = new LinkedList<>();
- }
- }
-
- public void clear() {
- for (Queue<Integer> q : buckets) {
- q.clear();
- }
- }
-
- public void sortIntoBucket(ISchemaPage page, short newSegSize) {
- if (page.getAsSegmentedPage() == null) {
- return;
- }
-
- // actual space occupied by a segment includes both its own length and
the length of its
- // offset. so available length for a segment is the spareSize minus the
offset bytes
- short availableSize =
- newSegSize < 0
- ? (short) (page.getAsSegmentedPage().getSpareSize() -
SchemaFileConfig.SEG_OFF_DIG)
- : (short)
- (page.getAsSegmentedPage().getSpareSize()
- - newSegSize
- - SchemaFileConfig.SEG_OFF_DIG);
-
- // too small to index
- if (availableSize <= SchemaFileConfig.SEG_HEADER_SIZE) {
- return;
- }
-
- // be like: SEG_HEADER < buckets[0] <= bounds[0] < buckets[1] <= ...
- for (int i = 0; i < bounds.length; i++) {
- // the last of SEG_SIZE_LST is the maximum page size, definitely
larger than others
- if (availableSize <= bounds[i]) {
- buckets[i].add(page.getPageIndex());
- return;
- }
- }
- }
-
- /**
- * @param withLock set if page container is a global/shared object
- * @return the page index will be removed from the bucket.
- */
- public synchronized ISchemaPage getNearestFitPage(short size, boolean
withLock) {
- ISchemaPage targetPage;
- int elemToCheck;
- for (int i = 0; i < buckets.length && pageContainer.size() > 0; i++) {
- // buckets[i] stores pages with spare space less than bounds[i]
- elemToCheck = buckets[i].size();
- while (size < bounds[i] && elemToCheck > 0) {
- // find roughly fit page
- targetPage = pageContainer.getOrDefault(buckets[i].poll(), null);
- elemToCheck--;
-
- if (targetPage == null || targetPage.getAsSegmentedPage() == null) {
- // act as lazy remove on buckets
- continue;
- }
-
- // seek in global container thus other page could be read locked
- if (withLock
- && targetPage.getAsSegmentedPage().isCapableForSegSize(size)
- && targetPage.getLock().writeLock().tryLock()) {
- if (targetPage.getAsSegmentedPage().isCapableForSegSize(size)) {
- return targetPage.getAsSegmentedPage();
- }
- targetPage.getLock().writeLock().unlock();
- }
-
- // only in local dirty pages which are always write locked by itself
- if (!withLock &&
targetPage.getAsSegmentedPage().isCapableForSegSize(size)) {
- return targetPage;
- }
-
- // not large as expected, fit into suitable bucket
- if (i > 0 &&
targetPage.getAsSegmentedPage().isCapableForSegSize(bounds[0])) {
- sortIntoBucket(targetPage, (short) -1);
- }
- }
- }
- return null;
- }
- }
-
@Override
public void setMetric(SchemaRegionCachedMetric metric) {
this.metric = metric;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/pagemgr/PagePool.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/pagemgr/PagePool.java
new file mode 100644
index 00000000000..7cd9d4d5860
--- /dev/null
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/pagemgr/PagePool.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.schemafile.pagemgr;
+
+import
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.schemafile.ISchemaPage;
+import
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.schemafile.SchemaFileConfig;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+public class PagePool {
+ private static final Logger logger = LoggerFactory.getLogger(PagePool.class);
+ private final Map<Integer, ISchemaPage> pageInstCache;
+ private final Lock cacheLock;
+ private final Condition cacheFull;
+
+ private final PageIndexSortBuckets pageIndexBuckets;
+
+ PagePool() {
+ this.pageInstCache =
+ Collections.synchronizedMap(new
LinkedHashMap<>(SchemaFileConfig.PAGE_CACHE_SIZE, 1, true));
+ this.pageIndexBuckets = new
PageIndexSortBuckets(SchemaFileConfig.SEG_SIZE_LST, pageInstCache);
+
+ this.cacheLock = new ReentrantLock();
+ this.cacheFull = this.cacheLock.newCondition();
+ }
+
+ /**
+ * A rough cache size guardian, all threads passed this entrant check will
not be limited with
+ * cache size anymore. TODO A better guardian is based on constraint per
thread.
+ */
+ public void cacheGuardian() {
+ cacheLock.lock();
+ try {
+ while (pageInstCache.size() > SchemaFileConfig.PAGE_CACHE_SIZE) {
+ try {
+ // try to evict by LRU
+ Iterator<ISchemaPage> iterator = pageInstCache.values().iterator();
+ int pageSizeLimit = SchemaFileConfig.PAGE_CACHE_SIZE, size =
pageInstCache.size();
+
+ ISchemaPage p;
+ while (iterator.hasNext()) {
+ p = iterator.next();
+
+ if (size <= pageSizeLimit) {
+ break;
+ }
+
+ if (p.getRefCnt().get() == 0) {
+ iterator.remove();
+ size--;
+ }
+ }
+
+ if (pageInstCache.size() > SchemaFileConfig.PAGE_CACHE_SIZE) {
+ // wait until another operation finished and released pages
+ cacheFull.await();
+ }
+ } catch (InterruptedException e) {
+ logger.warn(
+ "Interrupted during page cache eviction. Consider increasing
cache size, "
+ + "reducing concurrency, or extending timeout");
+ }
+ }
+ } finally {
+ cacheLock.unlock();
+ }
+ }
+
+ public void put(ISchemaPage page) {
+ pageInstCache.put(page.getPageIndex(), page);
+ }
+
+ public void lock() {
+ cacheLock.lock();
+ }
+
+ public void unlock() {
+ cacheLock.unlock();
+ }
+
+ public ISchemaPage get(int index) {
+ return pageInstCache.get(index);
+ }
+
+ public ISchemaPage getNearestFitPage(short expectedSize) {
+ return pageIndexBuckets.getNearestFitPage(expectedSize, true);
+ }
+
+ public void remove(int index) {
+ pageInstCache.remove(index);
+ }
+
+ public void clear() {
+ pageInstCache.clear();
+ }
+
+ public void appendBucketIndex(SchemaPageContext cxt) {
+ cxt.appendBucketIndex(pageIndexBuckets);
+ }
+
+ /** release referents and evict likely useless page if necessary */
+ protected void releaseReferent(SchemaPageContext cxt) {
+ for (ISchemaPage p : cxt.referredPages.values()) {
+ p.decrementAndGetRefCnt();
+ }
+
+ if (pageInstCache.size() > SchemaFileConfig.PAGE_CACHE_SIZE) {
+ cacheLock.lock();
+ try {
+ for (ISchemaPage p : cxt.referredPages.values()) {
+ // unnecessary to evict the page object in context by 2 case:
+ // 1. it is held by another thread, e.g., RefCnt != 0
+ // 2. it had already been evicted, e.g., pageCache.get(id) != page
+ if (p.getRefCnt().get() == 0 && pageInstCache.get(p.getPageIndex())
== p) {
+ pageInstCache.remove(p.getPageIndex());
+ }
+ }
+
+ if (pageInstCache.size() <= SchemaFileConfig.PAGE_CACHE_SIZE) {
+ cacheFull.signal();
+ }
+ } finally {
+ cacheLock.unlock();
+ }
+ }
+ }
+}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/pagemgr/SchemaPageContext.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/pagemgr/SchemaPageContext.java
new file mode 100644
index 00000000000..d494be5a097
--- /dev/null
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/pagemgr/SchemaPageContext.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.schemafile.pagemgr;
+
+import
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.schemafile.ISchemaPage;
+import
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.schemafile.ISegmentedPage;
+import
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.schemafile.SchemaFileConfig;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+/** Thread local variables about write/update process. */
+class SchemaPageContext {
+ final long threadID;
+ final PageIndexSortBuckets indexBuckets;
+ // locked and dirty pages are all referred pages, they all reside in page
cache
+ final Map<Integer, ISchemaPage> referredPages;
+ final Set<Integer> lockTraces;
+ // track B+Tree traversal trace
+ final int[] treeTrace;
+ int dirtyCnt;
+ int interleavedFlushCnt;
+
+ // flush B+Tree leaf before operation finished since all records are ordered
+ ISegmentedPage lastLeafPage;
+
+ public SchemaPageContext() {
+ threadID = Thread.currentThread().getId();
+ referredPages = new HashMap<>();
+ indexBuckets = new PageIndexSortBuckets(SchemaFileConfig.SEG_SIZE_LST,
referredPages);
+ treeTrace = new int[16];
+ lockTraces = new HashSet<>();
+ lastLeafPage = null;
+ dirtyCnt = 0;
+ interleavedFlushCnt = 0;
+ }
+
+ public void markDirty(ISchemaPage page) {
+ markDirty(page, false);
+ }
+
+ public void markDirty(ISchemaPage page, boolean forceReplace) {
+ if (!page.isDirtyPage()) {
+ dirtyCnt++;
+ }
+ page.setDirtyFlag();
+ refer(page);
+ if (forceReplace && referredPages.containsKey(page.getPageIndex())) {
+
+ // previous page is dirty, so it's not a new dirty page
+ if (referredPages.get(page.getPageIndex()).isDirtyPage()) {
+ dirtyCnt--;
+ }
+ // force to replace
+ referredPages.put(page.getPageIndex(), page);
+ }
+ }
+
+ public void traceLock(ISchemaPage page) {
+ refer(page);
+ lockTraces.add(page.getPageIndex());
+ }
+
+ // referred pages will not be evicted until operation finished
+ public void refer(ISchemaPage page) {
+ if (!referredPages.containsKey(page.getPageIndex())) {
+ page.incrementAndGetRefCnt();
+ referredPages.put(page.getPageIndex(), page);
+ }
+ }
+
+ /**
+ * Since records are ordered for write operation, it is reasonable to flush
those left siblings of
+ * the active leaf page. The target page would be initiated at the first
split within the
+ * operation.
+ *
+ * @param page left leaf of the split
+ */
+ public void invokeLastLeaf(ISchemaPage page) {
+ // only record at the first split
+ if (lastLeafPage == null) {
+ lastLeafPage = page.getAsSegmentedPage();
+ }
+ }
+
+ public void appendBucketIndex(PageIndexSortBuckets pisb) {
+ for (int i = 0; i < SchemaFileConfig.SEG_SIZE_LST.length; i++) {
+ pisb.getBucket(i).addAll(indexBuckets.getBucket(i));
+ }
+ }
+}