This is an automated email from the ASF dual-hosted git repository.
zyk pushed a commit to branch pbtree_concurrent
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/pbtree_concurrent by this push:
new dd02cb89fee PBTree Improvement: concurrent operation and interleaved
flush (#11698)
dd02cb89fee is described below
commit dd02cb89fee1f868bc04f8aabbd28c91b91efe26
Author: ZhaoXin <[email protected]>
AuthorDate: Fri Dec 15 09:30:19 2023 +0800
PBTree Improvement: concurrent operation and interleaved flush (#11698)
---
.../mtree/impl/pbtree/CachedMTreeStore.java | 4 +-
.../mtree/impl/pbtree/schemafile/ISchemaPage.java | 35 +-
.../impl/pbtree/schemafile/ISegmentedPage.java | 2 +-
.../mtree/impl/pbtree/schemafile/InternalPage.java | 9 +
.../mtree/impl/pbtree/schemafile/SchemaFile.java | 9 +-
.../mtree/impl/pbtree/schemafile/SchemaPage.java | 47 +-
.../impl/pbtree/schemafile/SegmentedPage.java | 16 +-
.../schemafile/pagemgr/BTreePageManager.java | 379 ++++++----
.../pbtree/schemafile/pagemgr/IPageManager.java | 9 +-
.../pbtree/schemafile/pagemgr/PageManager.java | 821 ++++++++++++++-------
.../mtree/schemafile/SchemaFileLogTest.java | 4 +-
.../metadata/mtree/schemafile/SchemaFileTest.java | 1 -
12 files changed, 884 insertions(+), 452 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/CachedMTreeStore.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/CachedMTreeStore.java
index d6ec24cb118..76aafffd435 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/CachedMTreeStore.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/CachedMTreeStore.java
@@ -40,7 +40,6 @@ import
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.mnode.ICa
import
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.mnode.container.ICachedMNodeContainer;
import
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.mnode.iterator.CachedTraverserIterator;
import
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.schemafile.ISchemaFile;
-import
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.schemafile.MockSchemaFile;
import
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.schemafile.SchemaFile;
import
org.apache.iotdb.db.schemaengine.schemaregion.mtree.loader.MNodeFactoryLoader;
import org.apache.iotdb.db.schemaengine.schemaregion.utils.MNodeUtils;
@@ -80,8 +79,7 @@ public class CachedMTreeStore implements
IMTreeStore<ICachedMNode> {
Runnable flushCallback)
throws MetadataException, IOException {
this.schemaRegionId = schemaRegionId;
- // file = SchemaFile.initSchemaFile(storageGroup.getFullPath(),
schemaRegionId);
- file = new MockSchemaFile(storageGroup);
+ file = SchemaFile.initSchemaFile(storageGroup.getFullPath(),
schemaRegionId);
root = file.init();
this.regionStatistics = regionStatistics;
this.memManager = new MemManager(regionStatistics);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/ISchemaPage.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/ISchemaPage.java
index 9a6b543734f..305c30fb94b 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/ISchemaPage.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/ISchemaPage.java
@@ -27,6 +27,9 @@ import java.io.IOException;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
public interface ISchemaPage {
/**
@@ -62,7 +65,8 @@ public interface ISchemaPage {
}
/** InternalPage should be initiated with a pointer which points to the
minimal child of it. */
- static SchemaPage initInternalPage(ByteBuffer buffer, int pageIndex, int
ptr) {
+ static SchemaPage initInternalPage(
+ ByteBuffer buffer, int pageIndex, int ptr, AtomicInteger ai,
ReadWriteLock rwl) {
buffer.clear();
buffer.position(SchemaFileConfig.PAGE_HEADER_SIZE);
@@ -84,10 +88,16 @@ public interface ISchemaPage {
ReadWriteIOUtils.write(-1L, buffer);
ReadWriteIOUtils.write(-1, buffer);
- return new InternalPage(buffer);
+ return new InternalPage(buffer, ai, rwl);
}
- static ISegmentedPage initSegmentedPage(ByteBuffer buffer, int pageIndex) {
+ static SchemaPage initInternalPage(ByteBuffer buffer, int pageIndex, int
ptr) {
+ return initInternalPage(
+ buffer, pageIndex, ptr, new AtomicInteger(), new
ReentrantReadWriteLock());
+ }
+
+ static ISegmentedPage initSegmentedPage(
+ ByteBuffer buffer, int pageIndex, AtomicInteger ai, ReadWriteLock rwl) {
buffer.clear();
ReadWriteIOUtils.write(SchemaFileConfig.SEGMENTED_PAGE, buffer);
ReadWriteIOUtils.write(pageIndex, buffer);
@@ -95,7 +105,12 @@ public interface ISchemaPage {
ReadWriteIOUtils.write((short) (buffer.capacity() -
SchemaFileConfig.PAGE_HEADER_SIZE), buffer);
ReadWriteIOUtils.write((short) 0, buffer);
ReadWriteIOUtils.write(-1L, buffer);
- return new SegmentedPage(buffer);
+ return new SegmentedPage(buffer, ai, rwl);
+ }
+
+ static ISegmentedPage initSegmentedPage(ByteBuffer buffer, int pageIndex) {
+ return ISchemaPage.initSegmentedPage(
+ buffer, pageIndex, new AtomicInteger(), new ReentrantReadWriteLock());
}
static SchemaPage initAliasIndexPage(ByteBuffer buffer, int pageIndex) {
@@ -109,10 +124,18 @@ public interface ISchemaPage {
return new AliasIndexPage(buffer);
}
+ AtomicInteger getRefCnt();
+
+ ReadWriteLock getLock();
+
void syncPageBuffer();
void flushPageToChannel(FileChannel channel) throws IOException;
+ boolean isDirtyPage();
+
+ void setDirtyFlag();
+
void flushPageToStream(OutputStream stream) throws IOException;
String inspect() throws SegmentNotFoundException;
@@ -133,10 +156,6 @@ public interface ISchemaPage {
ByteBuffer getEntireSegmentSlice() throws MetadataException;
- void markDirty();
-
- boolean isDirty();
-
@TestOnly
WrappedSegment getSegmentOnTest(short idx) throws SegmentNotFoundException;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/ISegmentedPage.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/ISegmentedPage.java
index 5d69f37dd6f..3e0f44db0e1 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/ISegmentedPage.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/ISegmentedPage.java
@@ -80,7 +80,7 @@ public interface ISegmentedPage extends ISchemaPage {
* Transplant designated segment from srcPage, to spare space of the page
*
* @param srcPage source page conveys source segment
- * @param segId id of the target segment
+ * @param segId id of the designated segment from the srcPage
* @param newSegSize size of new segment in this page
* @throws MetadataException if spare not enough, segment not found or
inconsistency
*/
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/InternalPage.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/InternalPage.java
index 724e97f67bf..f1c2d252f84 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/InternalPage.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/InternalPage.java
@@ -26,6 +26,8 @@ import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
import java.nio.ByteBuffer;
import java.util.ArrayDeque;
import java.util.Queue;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.ReadWriteLock;
/**
* A page which acts like a segment, manages index entry of the b+ tree
constructed by MNode with
@@ -67,6 +69,13 @@ public class InternalPage extends SchemaPage implements
ISegment<Integer, Intege
subIndexPage = ReadWriteIOUtils.readInt(pageBuffer);
}
+ /** compatible constructor for replacement */
+ public InternalPage(ByteBuffer pageBuffer, AtomicInteger ai, ReadWriteLock
rwl) {
+ super(pageBuffer, ai, rwl);
+ firstLeaf = ReadWriteIOUtils.readLong(pageBuffer);
+ subIndexPage = ReadWriteIOUtils.readInt(pageBuffer);
+ }
+
@Override
public int insertRecord(String key, Integer pointer) throws
RecordDuplicatedException {
// TODO: remove debug parameter INTERNAL_SPLIT_VALVE
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/SchemaFile.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/SchemaFile.java
index c703e641345..b8ac83ff915 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/SchemaFile.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/SchemaFile.java
@@ -243,10 +243,7 @@ public class SchemaFile implements ISchemaFile {
node.getFullPath()));
}
}
-
- pageManager.writeNewChildren(node);
- pageManager.writeUpdatedChildren(node);
- pageManager.flushDirtyPages();
+ pageManager.writeMNode(node);
updateHeaderBuffer();
}
@@ -270,7 +267,6 @@ public class SchemaFile implements ISchemaFile {
@Override
public void close() throws IOException {
updateHeaderBuffer();
- pageManager.flushDirtyPages();
pageManager.close();
forceChannel();
channel.close();
@@ -279,7 +275,6 @@ public class SchemaFile implements ISchemaFile {
@Override
public void sync() throws IOException {
updateHeaderBuffer();
- pageManager.flushDirtyPages();
forceChannel();
}
@@ -414,7 +409,7 @@ public class SchemaFile implements ISchemaFile {
return (short) (globalIndex & SchemaFileConfig.SEG_INDEX_MASK);
}
- /** TODO: shall merge with {@linkplain PageManager#reEstimateSegSize} */
+ /** TODO: shall merge with PageManager#reEstimateSegSize */
static short reEstimateSegSize(int oldSize) {
for (short size : SchemaFileConfig.SEG_SIZE_LST) {
if (oldSize < size) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/SchemaPage.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/SchemaPage.java
index d72c80b6da3..d10b8c23fcf 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/SchemaPage.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/SchemaPage.java
@@ -27,6 +27,9 @@ import java.io.IOException;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
/**
* {@link SchemaFile} manages a collection of SchemaPages, which acts like a
segment of index entry
@@ -37,6 +40,9 @@ public abstract class SchemaPage implements ISchemaPage {
// All other attributes are to describe this ByteBuffer
protected final ByteBuffer pageBuffer;
+ protected final AtomicInteger refCnt;
+ protected final ReadWriteLock lock;
+
protected int pageIndex; // only change when register buffer as a new page
protected short spareOffset; // bound of the variable length part in a
slotted structure
protected short spareSize; // traces spare space size simultaneously
@@ -44,19 +50,38 @@ public abstract class SchemaPage implements ISchemaPage {
protected boolean dirtyFlag = false; // any modification turns it true
- protected SchemaPage(ByteBuffer pageBuffer) {
+ /** for replacement page state transfer * */
+ protected SchemaPage(ByteBuffer pageBuffer, AtomicInteger ai, ReadWriteLock
rwl) {
this.pageBuffer = pageBuffer;
this.pageBuffer
.limit(this.pageBuffer.capacity())
.position(SchemaFileConfig.PAGE_HEADER_INDEX_OFFSET);
+ refCnt = ai;
+ lock = rwl;
+
pageIndex = ReadWriteIOUtils.readInt(this.pageBuffer);
spareOffset = ReadWriteIOUtils.readShort(this.pageBuffer);
spareSize = ReadWriteIOUtils.readShort(this.pageBuffer);
memberNum = ReadWriteIOUtils.readShort(this.pageBuffer);
}
+ /** compatible constructor with no existing ref or lock */
+ protected SchemaPage(ByteBuffer pageBuffer) {
+ this(pageBuffer, new AtomicInteger(), new ReentrantReadWriteLock());
+ }
+
+ @Override
+ public AtomicInteger getRefCnt() {
+ return refCnt;
+ }
+
+ @Override
+ public ReadWriteLock getLock() {
+ return lock;
+ }
+
@Override
public void syncPageBuffer() {
this.pageBuffer.limit(this.pageBuffer.capacity());
@@ -74,6 +99,16 @@ public abstract class SchemaPage implements ISchemaPage {
dirtyFlag = false;
}
+ @Override
+ public boolean isDirtyPage() {
+ return dirtyFlag;
+ }
+
+ @Override
+ public void setDirtyFlag() {
+ this.dirtyFlag = true;
+ }
+
@Override
public void flushPageToStream(OutputStream stream) throws IOException {
if (pageIndex < 0) {
@@ -129,16 +164,6 @@ public abstract class SchemaPage implements ISchemaPage {
return null;
}
- @Override
- public void markDirty() {
- dirtyFlag = true;
- }
-
- @Override
- public boolean isDirty() {
- return dirtyFlag;
- }
-
@Override
@TestOnly
public WrappedSegment getSegmentOnTest(short idx) throws
SegmentNotFoundException {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/SegmentedPage.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/SegmentedPage.java
index 0082f7d3665..b5de61e9da6 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/SegmentedPage.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/SegmentedPage.java
@@ -33,6 +33,9 @@ import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
public class SegmentedPage extends SchemaPage implements ISegmentedPage {
@@ -67,8 +70,19 @@ public class SegmentedPage extends SchemaPage implements
ISegmentedPage {
* #deleteSegment} mentioned.
* </ul>
*/
+ public SegmentedPage(ByteBuffer pageBuffer, AtomicInteger ai, ReadWriteLock
rwl) {
+ super(pageBuffer, ai, rwl);
+ segCacheMap = new ConcurrentHashMap<>();
+ segOffsetLst = new ArrayList<>();
+
+ pageBuffer.position(pageBuffer.capacity() - SchemaFileConfig.SEG_OFF_DIG *
memberNum);
+ for (int idx = 0; idx < memberNum; idx++) {
+ segOffsetLst.add(ReadWriteIOUtils.readShort(pageBuffer));
+ }
+ }
+
public SegmentedPage(ByteBuffer pageBuffer) {
- super(pageBuffer);
+ super(pageBuffer, new AtomicInteger(), new ReentrantReadWriteLock());
segCacheMap = new ConcurrentHashMap<>();
segOffsetLst = new ArrayList<>();
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/pagemgr/BTreePageManager.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/pagemgr/BTreePageManager.java
index e80c4dfb1dd..7c0bfd6f66d 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/pagemgr/BTreePageManager.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/pagemgr/BTreePageManager.java
@@ -49,10 +49,10 @@ public class BTreePageManager extends PageManager {
@Override
protected void multiPageInsertOverflowOperation(
- ISchemaPage curPage, String key, ByteBuffer childBuffer)
+ ISchemaPage curPage, String key, ByteBuffer childBuffer,
SchemaPageContext cxt)
throws MetadataException, IOException {
// cur is a leaf, split and set next ptr as link between
- ISegmentedPage newPage =
getMinApplSegmentedPageInMem(SchemaFileConfig.SEG_MAX_SIZ);
+ ISegmentedPage newPage =
getMinApplSegmentedPageInMem(SchemaFileConfig.SEG_MAX_SIZ, cxt);
newPage.allocNewSegment(SchemaFileConfig.SEG_MAX_SIZ);
String sk =
curPage
@@ -61,16 +61,16 @@ public class BTreePageManager extends PageManager {
curPage
.getAsSegmentedPage()
.setNextSegAddress((short) 0, getGlobalIndex(newPage.getPageIndex(),
(short) 0));
- markDirty(curPage);
- insertIndexEntryEntrance(curPage, newPage, sk);
+ cxt.markDirty(curPage);
+ insertIndexEntryEntrance(curPage, newPage, sk, cxt);
}
@Override
protected void multiPageUpdateOverflowOperation(
- ISchemaPage curPage, String key, ByteBuffer childBuffer)
+ ISchemaPage curPage, String key, ByteBuffer childBuffer,
SchemaPageContext cxt)
throws MetadataException, IOException {
- // split and update higer nodes
- ISegmentedPage splPage =
getMinApplSegmentedPageInMem(SchemaFileConfig.SEG_MAX_SIZ);
+ // even split and update higher nodes
+ ISegmentedPage splPage =
getMinApplSegmentedPageInMem(SchemaFileConfig.SEG_MAX_SIZ, cxt);
splPage.allocNewSegment(SchemaFileConfig.SEG_MAX_SIZ);
String sk = curPage.getAsSegmentedPage().splitWrappedSegment(null, null,
splPage, false);
curPage
@@ -85,7 +85,7 @@ public class BTreePageManager extends PageManager {
}
// insert index entry upward
- insertIndexEntryEntrance(curPage, splPage, sk);
+ insertIndexEntryEntrance(curPage, splPage, sk, cxt);
}
/**
@@ -96,8 +96,9 @@ public class BTreePageManager extends PageManager {
* @throws IOException
*/
@Override
- protected void buildSubIndex(ICachedMNode parNode) throws MetadataException,
IOException {
- ISchemaPage cursorPage =
getPageInstance(getPageIndex(getNodeAddress(parNode)));
+ protected void buildSubIndex(ICachedMNode parNode, SchemaPageContext cxt)
+ throws MetadataException, IOException {
+ ISchemaPage cursorPage =
getPageInstance(getPageIndex(getNodeAddress(parNode)), cxt);
if (cursorPage.getAsInternalPage() == null) {
throw new MetadataException("Subordinate index shall not build upon
single page segment.");
@@ -106,12 +107,12 @@ public class BTreePageManager extends PageManager {
ISchemaPage tPage = cursorPage; // reserve the top page to modify subIndex
ISchemaPage subIndexPage =
ISchemaPage.initAliasIndexPage(ByteBuffer.allocate(SchemaFileConfig.PAGE_LENGTH),
-1);
- registerAsNewPage(subIndexPage);
+ registerAsNewPage(subIndexPage, cxt);
// transfer cursorPage to leaf page
while (cursorPage.getAsInternalPage() != null) {
cursorPage =
-
getPageInstance(getPageIndex(cursorPage.getAsInternalPage().getNextSegAddress()));
+
getPageInstance(getPageIndex(cursorPage.getAsInternalPage().getNextSegAddress()),
cxt);
}
long nextAddr = cursorPage.getAsSegmentedPage().getNextSegAddress((short)
0);
@@ -120,7 +121,7 @@ public class BTreePageManager extends PageManager {
// TODO: inefficient to build B+Tree up-to-bottom, improve further
while (!children.isEmpty() || nextAddr != -1L) {
if (children.isEmpty()) {
- cursorPage = getPageInstance(getPageIndex(nextAddr));
+ cursorPage = getPageInstance(getPageIndex(nextAddr), cxt);
nextAddr = cursorPage.getAsSegmentedPage().getNextSegAddress((short)
0);
children = cursorPage.getAsSegmentedPage().getChildren((short) 0);
}
@@ -130,7 +131,7 @@ public class BTreePageManager extends PageManager {
&& child.getAsMeasurementMNode().getAlias() != null) {
subIndexPage =
insertAliasIndexEntry(
- subIndexPage, child.getAsMeasurementMNode().getAlias(),
child.getName());
+ subIndexPage, child.getAsMeasurementMNode().getAlias(),
child.getName(), cxt);
}
}
@@ -138,30 +139,31 @@ public class BTreePageManager extends PageManager {
}
@Override
- protected void insertSubIndexEntry(int base, String key, String rec)
+ protected void insertSubIndexEntry(int base, String key, String rec,
SchemaPageContext cxt)
throws MetadataException, IOException {
- insertAliasIndexEntry(getPageInstance(base), key, rec);
+ insertAliasIndexEntry(getPageInstance(base, cxt), key, rec, cxt);
}
@Override
- protected void removeSubIndexEntry(int base, String oldAlias)
+ protected void removeSubIndexEntry(int base, String oldAlias,
SchemaPageContext cxt)
throws MetadataException, IOException {
- ISchemaPage tarPage = getTargetLeafPage(getPageInstance(base), oldAlias);
+ ISchemaPage tarPage = getTargetLeafPage(getPageInstance(base, cxt),
oldAlias, cxt);
tarPage.getAsAliasIndexPage().removeRecord(oldAlias);
}
@Override
- protected String searchSubIndexAlias(int base, String alias)
+ protected String searchSubIndexAlias(int base, String alias,
SchemaPageContext cxt)
throws MetadataException, IOException {
- return getTargetLeafPage(getPageInstance(base), alias)
+ return getTargetLeafPage(getPageInstance(base, cxt), alias, cxt)
.getAsAliasIndexPage()
.getRecordByAlias(alias);
}
/** @return top page to insert index */
- private ISchemaPage insertAliasIndexEntry(ISchemaPage topPage, String alias,
String name)
+ private ISchemaPage insertAliasIndexEntry(
+ ISchemaPage topPage, String alias, String name, SchemaPageContext cxt)
throws MetadataException, IOException {
- ISchemaPage tarPage = getTargetLeafPage(topPage, alias);
+ ISchemaPage tarPage = getTargetLeafPage(topPage, alias, cxt);
if (tarPage.getAsAliasIndexPage() == null) {
throw new MetadataException("File may be corrupted that subordinate
index has broken.");
}
@@ -174,23 +176,25 @@ public class BTreePageManager extends PageManager {
.getAsAliasIndexPage()
.splitByKey(alias, name, spltBuf,
SchemaFileConfig.INCLINED_SPLIT);
ISchemaPage splPage = ISchemaPage.loadSchemaPage(spltBuf);
- registerAsNewPage(splPage);
+ registerAsNewPage(splPage, cxt);
- if (treeTrace[0] < 1) {
- // From single sub-index page to tree structure
+ if (cxt.treeTrace[0] < 1) {
+ // Transfer single sub-index page to tree structure
ByteBuffer trsBuf = ByteBuffer.allocate(SchemaFileConfig.PAGE_LENGTH);
tarPage.getAsAliasIndexPage().extendsTo(trsBuf);
ISchemaPage trsPage = ISchemaPage.loadSchemaPage(trsBuf);
// Notice that index of tarPage belongs to repPage then
- registerAsNewPage(trsPage);
+ registerAsNewPage(trsPage, cxt);
// tarPage abolished since then
ISchemaPage repPage =
ISchemaPage.initInternalPage(
ByteBuffer.allocate(SchemaFileConfig.PAGE_LENGTH),
tarPage.getPageIndex(),
- trsPage.getPageIndex());
+ trsPage.getPageIndex(),
+ tarPage.getRefCnt(),
+ tarPage.getLock());
if (0 > repPage.getAsInternalPage().insertRecord(sk,
splPage.getPageIndex())) {
throw new ColossalRecordException(sk, alias);
@@ -200,11 +204,12 @@ public class BTreePageManager extends PageManager {
.getAsInternalPage()
.setNextSegAddress(getGlobalIndex(trsPage.getPageIndex(), (short)
0));
- replacePageInCache(repPage);
+ replacePageInCache(repPage, cxt);
return repPage;
} else {
- insertIndexEntryRecursiveUpwards(treeTrace[0], sk,
splPage.getPageIndex());
- return getPageInstance(treeTrace[1]);
+ // no interleaved flush implemented since alias of the incoming
records are NOT ordered
+ insertIndexEntryRecursiveUpwards(cxt.treeTrace[0], sk,
splPage.getPageIndex(), cxt);
+ return getPageInstance(cxt.treeTrace[1], cxt);
}
}
return topPage;
@@ -217,28 +222,43 @@ public class BTreePageManager extends PageManager {
* @param splPage new page for original page to split.
* @param sk least key of splPage.
*/
- private void insertIndexEntryEntrance(ISchemaPage curPage, ISchemaPage
splPage, String sk)
+ private void insertIndexEntryEntrance(
+ ISchemaPage curPage, ISchemaPage splPage, String sk, SchemaPageContext
cxt)
throws MetadataException, IOException {
- if (treeTrace[0] < 1) {
- ISegmentedPage trsPage =
getMinApplSegmentedPageInMem(SchemaFileConfig.SEG_MAX_SIZ);
+ if (cxt.treeTrace[0] < 1) {
+ // To make parent pointer valid after the btree established, curPage
need to be transformed
+ // into an InternalPage. Since NOW page CANNOT transform in place, a
substitute InternalPage
+ // inherits index from curPage initiated.
+ ISegmentedPage trsPage =
getMinApplSegmentedPageInMem(SchemaFileConfig.SEG_MAX_SIZ, cxt);
trsPage.transplantSegment(
curPage.getAsSegmentedPage(), (short) 0,
SchemaFileConfig.SEG_MAX_SIZ);
+
+ // repPage inherits lock and referent from curPage, which is always
locked by entrant.
ISchemaPage repPage =
ISchemaPage.initInternalPage(
ByteBuffer.allocate(SchemaFileConfig.PAGE_LENGTH),
curPage.getPageIndex(),
- trsPage.getPageIndex());
+ trsPage.getPageIndex(),
+ curPage.getRefCnt(),
+ curPage.getLock());
+ // link right child of the initiated InternalPage
if (0 > repPage.getAsInternalPage().insertRecord(sk,
splPage.getPageIndex())) {
throw new ColossalRecordException(sk);
}
+ // setNextSegment of root as the left-most leaf
repPage
.getAsInternalPage()
.setNextSegAddress(getGlobalIndex(trsPage.getPageIndex(), (short)
0));
- replacePageInCache(repPage);
+
+ // mark the left-most child for interleaved flush, given the write
operation is ordered
+ cxt.invokeLastLeaf(trsPage);
+ replacePageInCache(repPage, cxt);
} else {
- insertIndexEntryRecursiveUpwards(treeTrace[0], sk,
splPage.getPageIndex());
+ // if the write starts from non-left-most leaf, it shall be recorded as
well
+ cxt.invokeLastLeaf(curPage);
+ insertIndexEntryRecursiveUpwards(cxt.treeTrace[0], sk,
splPage.getPageIndex(), cxt);
}
}
@@ -246,12 +266,14 @@ public class BTreePageManager extends PageManager {
* Insert an index entry into an internal page. Cascade insert or internal
split conducted if
* necessary.
*
+ * @param treeTraceIndex position of page index which had been traced
* @param key key of the entry
* @param ptr pointer of the entry
*/
- private void insertIndexEntryRecursiveUpwards(int treeTraceIndex, String
key, int ptr)
+ private void insertIndexEntryRecursiveUpwards(
+ int treeTraceIndex, String key, int ptr, SchemaPageContext cxt)
throws MetadataException, IOException {
- ISchemaPage idxPage = getPageInstance(treeTrace[treeTraceIndex]);
+ ISchemaPage idxPage = getPageInstance(cxt.treeTrace[treeTraceIndex], cxt);
if (idxPage.getAsInternalPage().insertRecord(key, ptr) < 0) {
// handle when insert an index entry occurring an overflow
if (treeTraceIndex > 1) {
@@ -262,14 +284,14 @@ public class BTreePageManager extends PageManager {
.getAsInternalPage()
.splitByKey(key, ptr, dstBuffer,
SchemaFileConfig.INCLINED_SPLIT);
ISchemaPage dstPage = ISchemaPage.loadSchemaPage(dstBuffer);
- registerAsNewPage(dstPage);
- insertIndexEntryRecursiveUpwards(treeTraceIndex - 1, splitKey,
dstPage.getPageIndex());
+ registerAsNewPage(dstPage, cxt);
+ insertIndexEntryRecursiveUpwards(treeTraceIndex - 1, splitKey,
dstPage.getPageIndex(), cxt);
} else {
// treeTraceIndex==1, idxPage is the root of B+Tree, to split for new
root internal
ByteBuffer splBuffer =
ByteBuffer.allocate(SchemaFileConfig.PAGE_LENGTH);
ByteBuffer trsBuffer =
ByteBuffer.allocate(SchemaFileConfig.PAGE_LENGTH);
- // idxPage shall be split, and reserved as root of B+Tree
+ // idxPage splits and transplants, and remains to be root of B+Tree
String splitKey =
idxPage
.getAsInternalPage()
@@ -277,8 +299,8 @@ public class BTreePageManager extends PageManager {
idxPage.getAsInternalPage().extendsTo(trsBuffer);
ISchemaPage splPage = ISchemaPage.loadSchemaPage(splBuffer);
ISchemaPage trsPage = ISchemaPage.loadSchemaPage(trsBuffer);
- registerAsNewPage(splPage);
- registerAsNewPage(trsPage);
+ registerAsNewPage(splPage, cxt);
+ registerAsNewPage(trsPage, cxt);
idxPage.getAsInternalPage().resetBuffer(trsPage.getPageIndex());
if (idxPage.getAsInternalPage().insertRecord(splitKey,
splPage.getPageIndex()) < 0) {
@@ -289,173 +311,210 @@ public class BTreePageManager extends PageManager {
.setNextSegAddress(trsPage.getAsInternalPage().getNextSegAddress());
}
}
- markDirty(idxPage);
- addPageToCache(idxPage.getPageIndex(), idxPage);
+ cxt.markDirty(idxPage);
}
@Override
public void delete(ICachedMNode node) throws IOException, MetadataException {
- // remove corresponding record
- long recSegAddr = getNodeAddress(node.getParent());
- recSegAddr = getTargetSegmentAddress(recSegAddr, node.getName());
- ISchemaPage tarPage = getPageInstance(getPageIndex(recSegAddr));
- markDirty(tarPage);
- tarPage.getAsSegmentedPage().removeRecord(getSegIndex(recSegAddr),
node.getName());
-
- // remove segments belongs to node
- if (!node.isMeasurement() && getNodeAddress(node) > 0) {
- // node with maliciously modified address may result in orphan pages
- long delSegAddr = getNodeAddress(node);
- tarPage = getPageInstance(getPageIndex(delSegAddr));
-
- if (tarPage.getAsSegmentedPage() != null) {
- // TODO: may produce fractured page
- markDirty(tarPage);
- tarPage.getAsSegmentedPage().deleteSegment(getSegIndex(delSegAddr));
- if (tarPage.getAsSegmentedPage().validSegments() == 0) {
- tarPage.getAsSegmentedPage().purgeSegments();
- }
- }
-
- if (tarPage.getAsInternalPage() != null) {
- // If the deleted one points to an Internal (root of BTree), there are
two BTrees to handle:
- // one mapping node names to record buffers, and another mapping
aliases to names. </br>
- // All of those are turned into SegmentedPage.
- Deque<Integer> cascadePages = new
ArrayDeque<>(tarPage.getAsInternalPage().getAllRecords());
- cascadePages.add(tarPage.getPageIndex());
-
- if (tarPage.getSubIndex() >= 0) {
- cascadePages.add(tarPage.getSubIndex());
- }
-
- while (!cascadePages.isEmpty()) {
- if (dirtyPages.size() > SchemaFileConfig.PAGE_CACHE_SIZE) {
- flushDirtyPages();
- }
-
- tarPage = getPageInstance(cascadePages.poll());
- if (tarPage.getAsSegmentedPage() != null) {
+ cacheGuardian();
+ SchemaPageContext cxt = new SchemaPageContext();
+ // node is the record deleted from its segment
+ entrantLock(node.getParent(), cxt);
+ try {
+ // remove corresponding record
+ long recSegAddr = getNodeAddress(node.getParent());
+ recSegAddr = getTargetSegmentAddress(recSegAddr, node.getName(), cxt);
+ ISchemaPage tarPage = getPageInstance(getPageIndex(recSegAddr), cxt);
+ cxt.markDirty(tarPage);
+ tarPage.getAsSegmentedPage().removeRecord(getSegIndex(recSegAddr),
node.getName());
+
+ // remove segments belongs to node
+ if (!node.isMeasurement() && getNodeAddress(node) > 0) {
+ // node with maliciously modified address may result in orphan pages
+ long delSegAddr = getNodeAddress(node);
+ tarPage = getPageInstance(getPageIndex(delSegAddr), cxt);
+
+ if (tarPage.getAsSegmentedPage() != null) {
+ // TODO: may produce fractured page
+ cxt.markDirty(tarPage);
+ tarPage.getAsSegmentedPage().deleteSegment(getSegIndex(delSegAddr));
+ if (tarPage.getAsSegmentedPage().validSegments() == 0) {
tarPage.getAsSegmentedPage().purgeSegments();
- markDirty(tarPage);
- addPageToCache(tarPage.getPageIndex(), tarPage);
- continue;
+ cxt.indexBuckets.sortIntoBucket(tarPage, (short) -1);
}
+ }
- if (tarPage.getAsInternalPage() != null) {
- cascadePages.addAll(tarPage.getAsInternalPage().getAllRecords());
+ if (tarPage.getAsInternalPage() != null) {
+ // If the deleted one points to an Internal (root of BTree),
+ // there are two BTrees to handle:
+ // one mapping node names to record buffers,
+ // and another mapping aliases to names. <br>
+ // All of those are turned into SegmentedPage.
+ Deque<Integer> cascadePages =
+ new ArrayDeque<>(tarPage.getAsInternalPage().getAllRecords());
+ cascadePages.add(tarPage.getPageIndex());
+
+ if (tarPage.getSubIndex() >= 0) {
+ cascadePages.add(tarPage.getSubIndex());
}
- tarPage =
- ISchemaPage.initSegmentedPage(
- ByteBuffer.allocate(SchemaFileConfig.PAGE_LENGTH),
tarPage.getPageIndex());
- replacePageInCache(tarPage);
+ while (!cascadePages.isEmpty()) {
+ tarPage = getPageInstance(cascadePages.poll(), cxt);
+ if (tarPage.getAsSegmentedPage() != null) {
+ tarPage.getAsSegmentedPage().purgeSegments();
+ cxt.markDirty(tarPage);
+ cxt.indexBuckets.sortIntoBucket(tarPage, (short) -1);
+ continue;
+ }
+
+ if (tarPage.getAsInternalPage() != null) {
+ cascadePages.addAll(tarPage.getAsInternalPage().getAllRecords());
+ }
+
+ tarPage =
+ ISchemaPage.initSegmentedPage(
+ ByteBuffer.allocate(SchemaFileConfig.PAGE_LENGTH),
+ tarPage.getPageIndex(),
+ tarPage.getRefCnt(),
+ tarPage.getLock());
+ replacePageInCache(tarPage, cxt);
+ }
}
}
+ flushDirtyPages(cxt);
+ } finally {
+ releaseLocks(cxt);
+ releaseReferent(cxt);
}
- flushDirtyPages();
}
@Override
public ICachedMNode getChildNode(ICachedMNode parent, String childName)
throws MetadataException, IOException {
+ // TODO unnecessary context var
+ SchemaPageContext cxt = new SchemaPageContext();
+
if (getNodeAddress(parent) < 0) {
throw new MetadataException(
String.format(
"Node [%s] has no valid segment address in pbtree file.",
parent.getFullPath()));
}
- long actualSegAddr = getTargetSegmentAddress(getNodeAddress(parent),
childName);
- ICachedMNode child =
- getPageInstance(getPageIndex(actualSegAddr))
- .getAsSegmentedPage()
- .read(getSegIndex(actualSegAddr), childName);
-
- if (child == null && parent.isDevice()) {
- // try read alias directly first
- child =
- getPageInstance(getPageIndex(actualSegAddr))
+ // a single read lock on initial page is sufficient to mutex write, no
need to trace it
+ int initIndex = getPageIndex(getNodeAddress(parent));
+ getPageInstance(initIndex, cxt).getLock().readLock().lock();
+ try {
+ long actualSegAddr = getTargetSegmentAddress(getNodeAddress(parent),
childName, cxt);
+ ICachedMNode child =
+ getPageInstance(getPageIndex(actualSegAddr), cxt)
.getAsSegmentedPage()
- .readByAlias(getSegIndex(actualSegAddr), childName);
- if (child != null) {
- return child;
- }
+ .read(getSegIndex(actualSegAddr), childName);
+
+ if (child == null && parent.isDevice()) {
+ // try read alias directly first
+ child =
+ getPageInstance(getPageIndex(actualSegAddr), cxt)
+ .getAsSegmentedPage()
+ .readByAlias(getSegIndex(actualSegAddr), childName);
+ if (child != null) {
+ return child;
+ }
- // try read with sub-index
- return getChildWithAlias(parent, childName);
+ // try read with sub-index
+ return getChildWithAlias(parent, childName);
+ }
+ return child;
+ } finally {
+ getPageInstance(initIndex, cxt).getLock().readLock().unlock();
+ releaseReferent(cxt);
}
- return child;
}
private ICachedMNode getChildWithAlias(ICachedMNode par, String alias)
throws IOException, MetadataException {
+ // TODO unnecessary context var
+ SchemaPageContext cxt = new SchemaPageContext();
+
long srtAddr = getNodeAddress(par);
- ISchemaPage page = getPageInstance(getPageIndex(srtAddr));
+ ISchemaPage page = getPageInstance(getPageIndex(srtAddr), cxt);
if (page.getAsInternalPage() == null || page.getSubIndex() < 0) {
return null;
}
- String name = searchSubIndexAlias(page.getSubIndex(), alias);
+ String name = searchSubIndexAlias(page.getSubIndex(), alias, cxt);
if (name == null) {
return null;
}
- return getTargetLeafPage(page, name).getAsSegmentedPage().read((short) 0,
name);
+ return getTargetLeafPage(page, name,
cxt).getAsSegmentedPage().read((short) 0, name);
}
@Override
public Iterator<ICachedMNode> getChildren(ICachedMNode parent)
throws MetadataException, IOException {
+ SchemaPageContext cxt = new SchemaPageContext();
int pageIdx = getPageIndex(getNodeAddress(parent));
+
short segId = getSegIndex(getNodeAddress(parent));
- ISchemaPage page = getPageInstance(pageIdx);
+ ISchemaPage page = getPageInstance(pageIdx, cxt);
+ page.getLock().readLock().lock();
- while (page.getAsSegmentedPage() == null) {
- page =
getPageInstance(getPageIndex(page.getAsInternalPage().getNextSegAddress()));
- }
+ try {
+ while (page.getAsSegmentedPage() == null) {
+ page =
getPageInstance(getPageIndex(page.getAsInternalPage().getNextSegAddress()),
cxt);
+ }
- long actualSegAddr = page.getAsSegmentedPage().getNextSegAddress(segId);
- Queue<ICachedMNode> initChildren =
page.getAsSegmentedPage().getChildren(segId);
- return new Iterator<ICachedMNode>() {
- long nextSeg = actualSegAddr;
- Queue<ICachedMNode> children = initChildren;
+ long actualSegAddr = page.getAsSegmentedPage().getNextSegAddress(segId);
+ Queue<ICachedMNode> initChildren =
page.getAsSegmentedPage().getChildren(segId);
- @Override
- public boolean hasNext() {
- if (!children.isEmpty()) {
- return true;
- }
- if (nextSeg < 0) {
- return false;
- }
+ return new Iterator<ICachedMNode>() {
+ long nextSeg = actualSegAddr;
+ Queue<ICachedMNode> children = initChildren;
- try {
- ISchemaPage nPage;
- while (children.isEmpty() && nextSeg >= 0) {
- nPage = getPageInstance(getPageIndex(nextSeg));
- children =
nPage.getAsSegmentedPage().getChildren(getSegIndex(nextSeg));
- nextSeg =
nPage.getAsSegmentedPage().getNextSegAddress(getSegIndex(nextSeg));
+ @Override
+ public boolean hasNext() {
+ if (!children.isEmpty()) {
+ return true;
+ }
+ if (nextSeg < 0) {
+ return false;
}
- } catch (MetadataException | IOException e) {
- logger.error(e.getMessage());
- return false;
- }
- return !children.isEmpty();
- }
+ try {
+ ISchemaPage nPage;
+ while (children.isEmpty() && nextSeg >= 0) {
+ nPage = getPageInstance(getPageIndex(nextSeg), cxt);
+ children =
nPage.getAsSegmentedPage().getChildren(getSegIndex(nextSeg));
+ nextSeg =
nPage.getAsSegmentedPage().getNextSegAddress(getSegIndex(nextSeg));
+ // children iteration need not pin page, consistency is
guaranteed by upper layer
+ nPage.getRefCnt().decrementAndGet();
+ }
+ } catch (MetadataException | IOException e) {
+ logger.error(e.getMessage());
+ return false;
+ }
- @Override
- public ICachedMNode next() {
- return children.poll();
- }
- };
+ return !children.isEmpty();
+ }
+
+ @Override
+ public ICachedMNode next() {
+ return children.poll();
+ }
+ };
+ } finally {
+ // safety of iterator should be guaranteed by upper layer
+ getPageInstance(pageIdx, cxt).getLock().readLock().unlock();
+ releaseReferent(cxt);
+ }
}
/** Seek non-InternalPage by name, syntax sugar of {@linkplain
#getTargetSegmentAddress}. */
- private ISchemaPage getTargetLeafPage(ISchemaPage topPage, String recKey)
+ private ISchemaPage getTargetLeafPage(ISchemaPage topPage, String recKey,
SchemaPageContext cxt)
throws IOException, MetadataException {
- treeTrace[0] = 0;
+ cxt.treeTrace[0] = 0;
if (topPage.getAsInternalPage() == null) {
return topPage;
}
@@ -464,10 +523,10 @@ public class BTreePageManager extends PageManager {
int i = 0; // mark the trace of b+ tree node
while (curPage.getAsInternalPage() != null) {
i++;
- treeTrace[i] = curPage.getPageIndex();
- curPage =
getPageInstance(curPage.getAsInternalPage().getRecordByKey(recKey));
+ cxt.treeTrace[i] = curPage.getPageIndex();
+ curPage =
getPageInstance(curPage.getAsInternalPage().getRecordByKey(recKey), cxt);
}
- treeTrace[0] = i; // bound in no.0 elem, points the parent the return
+ cxt.treeTrace[0] = i; // bound in no.0 elem, points the parent the return
return curPage;
}
@@ -480,10 +539,10 @@ public class BTreePageManager extends PageManager {
* @return address of the target segment.
*/
@Override
- protected long getTargetSegmentAddress(long curSegAddr, String recKey)
+ protected long getTargetSegmentAddress(long curSegAddr, String recKey,
SchemaPageContext cxt)
throws IOException, MetadataException {
- treeTrace[0] = 0;
- ISchemaPage curPage = getPageInstance(getPageIndex(curSegAddr));
+ cxt.treeTrace[0] = 0;
+ ISchemaPage curPage = getPageInstance(getPageIndex(curSegAddr), cxt);
if (curPage.getAsSegmentedPage() != null) {
return curSegAddr;
}
@@ -491,10 +550,10 @@ public class BTreePageManager extends PageManager {
int i = 0; // mark the trace of b+ tree node
while (curPage.getAsInternalPage() != null) {
i++;
- treeTrace[i] = curPage.getPageIndex();
- curPage =
getPageInstance(curPage.getAsInternalPage().getRecordByKey(recKey));
+ cxt.treeTrace[i] = curPage.getPageIndex();
+ curPage =
getPageInstance(curPage.getAsInternalPage().getRecordByKey(recKey), cxt);
}
- treeTrace[0] = i; // bound in no.0 elem, points the parent the return
+ cxt.treeTrace[0] = i; // bound in no.0 elem, points the parent the return
return getGlobalIndex(curPage.getPageIndex(), (short) 0);
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/pagemgr/IPageManager.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/pagemgr/IPageManager.java
index 462c348b7fc..d1014550407 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/pagemgr/IPageManager.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/pagemgr/IPageManager.java
@@ -38,9 +38,10 @@ import java.util.Iterator;
*/
public interface IPageManager {
- void writeNewChildren(ICachedMNode parNode) throws MetadataException,
IOException;
-
- void writeUpdatedChildren(ICachedMNode parNode) throws MetadataException,
IOException;
+ /**
+ * All change will be an internal process, lock and dirty pages are now in
the scope of context.
+ */
+ void writeMNode(ICachedMNode node) throws MetadataException, IOException;
void delete(ICachedMNode node) throws IOException, MetadataException;
@@ -51,8 +52,6 @@ public interface IPageManager {
void clear() throws IOException, MetadataException;
- void flushDirtyPages() throws IOException;
-
void close() throws IOException;
int getLastPageIndex();
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/pagemgr/PageManager.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/pagemgr/PageManager.java
index eb36b6d2be7..015bd38fb06 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/pagemgr/PageManager.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/pagemgr/PageManager.java
@@ -44,19 +44,28 @@ import java.io.PrintWriter;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.file.StandardOpenOption;
-import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
+import java.util.Queue;
+import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.stream.Collectors;
+import static
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.schemafile.SchemaFile.getNodeAddress;
+import static
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.schemafile.SchemaFile.getPageIndex;
+
/**
* Abstraction for various implementation of structure of pages. But
multi-level index is hard-coded
* within the framework since only alias works like secondary index now.
@@ -67,21 +76,16 @@ public abstract class PageManager implements IPageManager {
protected static final Logger logger =
LoggerFactory.getLogger(PageManager.class);
protected final Map<Integer, ISchemaPage> pageInstCache;
- protected final Map<Integer, ISchemaPage> dirtyPages;
+ // bucket for quick retrieval, only append when write operation finished
+ protected final PageIndexSortBuckets pageIndexBuckets;
- // optimize retrieval of the smallest applicable DIRTY segmented page
- // tiered by: MIN_SEG_SIZE, PAGE/16, PAGE/8, PAGE/4, PAGE/2, PAGE_SIZE
- protected final LinkedList<Integer>[] tieredDirtyPageIndex =
- new LinkedList[SchemaFileConfig.SEG_SIZE_LST.length];
+ protected final Lock cacheLock;
+ protected final Condition cacheFull;
- protected final ReentrantLock evictLock;
- protected final PageLocks pageLocks;
+ private final Map<Long, SchemaPageContext> threadContexts;
protected final AtomicInteger lastPageIndex;
- // shift to ThreadLocal if concurrent write expected
- protected int[] treeTrace;
-
private final FileChannel channel;
// handle timeout interruption during reading
@@ -93,21 +97,20 @@ public abstract class PageManager implements IPageManager {
// flush strategy is dependent on consensus protocol, only check protocol on
init
protected FlushPageStrategy flushDirtyPagesStrategy;
+ protected SinglePageFlushStrategy singlePageFlushStrategy;
PageManager(FileChannel channel, File pmtFile, int lastPageIndex, String
logPath)
throws IOException, MetadataException {
this.pageInstCache =
Collections.synchronizedMap(new
LinkedHashMap<>(SchemaFileConfig.PAGE_CACHE_SIZE, 1, true));
- this.dirtyPages = new ConcurrentHashMap<>();
- for (int i = 0; i < tieredDirtyPageIndex.length; i++) {
- tieredDirtyPageIndex[i] = new LinkedList<>();
- }
+ this.pageIndexBuckets = new
PageIndexSortBuckets(SchemaFileConfig.SEG_SIZE_LST, pageInstCache);
+ this.threadContexts = new ConcurrentHashMap<>();
+
+ this.cacheLock = new ReentrantLock();
+ this.cacheFull = this.cacheLock.newCondition();
- this.evictLock = new ReentrantLock();
- this.pageLocks = new PageLocks();
this.lastPageIndex =
lastPageIndex >= 0 ? new AtomicInteger(lastPageIndex) : new
AtomicInteger(0);
- this.treeTrace = new int[16];
this.channel = channel;
this.pmtFile = pmtFile;
this.readChannel = FileChannel.open(pmtFile.toPath(),
StandardOpenOption.READ);
@@ -120,12 +123,14 @@ public abstract class PageManager implements IPageManager
{
logCounter = new AtomicInteger();
logWriter = null;
flushDirtyPagesStrategy = this::flushDirtyPagesWithoutLogging;
+ singlePageFlushStrategy = this::flushSinglePageWithoutLogging;
} else {
// without RATIS, utilize physical logging for integrity
int pageAcc = (int) recoverFromLog(logPath) /
SchemaFileConfig.PAGE_LENGTH;
this.logWriter = new SchemaFileLogWriter(logPath);
logCounter = new AtomicInteger(pageAcc);
flushDirtyPagesStrategy = this::flushDirtyPagesWithLogging;
+ singlePageFlushStrategy = this::flushSinglePageWithLogging;
}
// construct first page if file to init
@@ -134,7 +139,8 @@ public abstract class PageManager implements IPageManager {
ISchemaPage.initSegmentedPage(ByteBuffer.allocate(SchemaFileConfig.PAGE_LENGTH),
0);
rootPage.allocNewSegment(SchemaFileConfig.SEG_MAX_SIZ);
pageInstCache.put(rootPage.getPageIndex(), rootPage);
- markDirty(rootPage);
+ rootPage.syncPageBuffer();
+ rootPage.flushPageToChannel(channel);
}
}
@@ -161,11 +167,148 @@ public abstract class PageManager implements
IPageManager {
return 0L;
}
- // region Framework Methods
+ /**
+ * A rough cache size guardian, all threads passed this entrant check will
not be limited with
+ * cache size anymore. TODO A better guardian is based on constraint per
thread.
+ */
+ protected void cacheGuardian() {
+ cacheLock.lock();
+ try {
+ while (pageInstCache.size() > SchemaFileConfig.PAGE_CACHE_SIZE) {
+ try {
+ // try to evict by LRU
+ Iterator<ISchemaPage> iterator = pageInstCache.values().iterator();
+ int pageSizeLimit = SchemaFileConfig.PAGE_CACHE_SIZE, size =
pageInstCache.size();
+
+ ISchemaPage p;
+ while (iterator.hasNext()) {
+ p = iterator.next();
+
+ if (size <= pageSizeLimit) {
+ break;
+ }
+
+ if (p.getRefCnt().get() == 0) {
+ iterator.remove();
+ size--;
+ }
+ }
+
+ if (pageInstCache.size() > SchemaFileConfig.PAGE_CACHE_SIZE) {
+ // wait until another operation finished and released pages
+ cacheFull.await();
+ }
+ } catch (InterruptedException e) {
+ logger.warn(
+ "Interrupted during page cache eviction. Consider increasing
cache size, "
+ + "reducing concurrency, or extending timeout");
+ }
+ }
+ } finally {
+ cacheLock.unlock();
+ }
+ }
+
@Override
- public void writeNewChildren(ICachedMNode node) throws MetadataException,
IOException {
+ public void writeMNode(ICachedMNode node) throws MetadataException,
IOException {
+ SchemaPageContext cxt = new SchemaPageContext();
+ threadContexts.put(Thread.currentThread().getId(), cxt);
+ cacheGuardian();
+ entrantLock(node, cxt);
+ try {
+ writeNewChildren(node, cxt);
+ writeUpdatedChildren(node, cxt);
+ flushDirtyPages(cxt);
+ } finally {
+ releaseLocks(cxt);
+ releaseReferent(cxt);
+ threadContexts.remove(Thread.currentThread().getId(), cxt);
+ }
+ }
+
+ /** Context only tracks write locks, and so it shall be released. */
+ protected void releaseLocks(SchemaPageContext cxt) throws IOException,
MetadataException {
+ for (int i : cxt.lockTraces) {
+ pageInstCache.get(i).getLock().writeLock().unlock();
+ }
+ }
+
+ /** release referents and evict likely useless page if necessary */
+ protected void releaseReferent(SchemaPageContext cxt) throws IOException,
MetadataException {
+ for (ISchemaPage p : cxt.referredPages.values()) {
+ p.getRefCnt().decrementAndGet();
+ }
+
+ if (pageInstCache.size() > SchemaFileConfig.PAGE_CACHE_SIZE) {
+ cacheLock.lock();
+ try {
+ for (ISchemaPage p : cxt.referredPages.values()) {
+ if (p.getRefCnt().get() == 0) {
+ pageInstCache.remove(p.getPageIndex());
+ }
+ }
+
+ if (pageInstCache.size() <= SchemaFileConfig.PAGE_CACHE_SIZE) {
+ cacheFull.signal();
+ }
+ } finally {
+ cacheLock.unlock();
+ }
+ }
+ }
+
+ /** locking in the order of page index to avoid deadlock. */
+ protected void entrantLock(ICachedMNode node, SchemaPageContext cxt)
+ throws IOException, MetadataException {
+ int initPageIndex = getPageIndex(getNodeAddress(node));
+ ISchemaPage page;
+
+ if (node.isDatabase()) {
+ if (initPageIndex < 0) {
+ // node is using template
+ return;
+ }
+
+ page = getPageInstance(initPageIndex, cxt);
+ page.getLock().writeLock().lock();
+ cxt.traceLock(page);
+ cxt.indexBuckets.sortIntoBucket(page, (short) -1);
+ } else {
+ int parIndex = getPageIndex(getNodeAddress(node.getParent()));
+
+ int minPageIndex = Math.min(initPageIndex, parIndex);
+ int maxPageIndex = Math.max(initPageIndex, parIndex);
+
+ // as InternalPage will not transplant, its parent pointer needs no lock
+ page = getPageInstance(initPageIndex, cxt);
+ if (page.getAsInternalPage() != null) {
+ page.getLock().writeLock().lock();
+ cxt.traceLock(page);
+ cxt.indexBuckets.sortIntoBucket(page, (short) -1);
+ return;
+ }
+
+ if (minPageIndex > 0) {
+ page = getPageInstance(minPageIndex, cxt);
+ page.getLock().writeLock().lock();
+ cxt.traceLock(page);
+ cxt.indexBuckets.sortIntoBucket(page, (short) -1);
+ }
+
+ if (minPageIndex != maxPageIndex && maxPageIndex > 0) {
+ page = getPageInstance(maxPageIndex, cxt);
+ page.getLock().writeLock().lock();
+ cxt.traceLock(page);
+ cxt.indexBuckets.sortIntoBucket(page, (short) -1);
+ }
+ }
+ }
+
+ // region Framework Methods
+ private void writeNewChildren(ICachedMNode node, SchemaPageContext cxt)
+ throws MetadataException, IOException {
int subIndex;
- long curSegAddr = SchemaFile.getNodeAddress(node);
+ long curSegAddr = getNodeAddress(node);
long actualAddress; // actual segment to write record
ICachedMNode child;
ISchemaPage curPage;
@@ -181,7 +324,7 @@ public abstract class PageManager implements IPageManager {
if (!child.isMeasurement()) {
alias = null;
- if (SchemaFile.getNodeAddress(child) >= 0) {
+ if (getNodeAddress(child) >= 0) {
// new child with a valid segment address, weird
throw new MetadataException(
String.format(
@@ -192,7 +335,7 @@ public abstract class PageManager implements IPageManager {
// pre-allocate except that child is a device node using template
if (!(child.isDevice() && child.getAsDeviceMNode().isUseTemplate())) {
short estSegSize = estimateSegmentSize(child);
- long glbIndex = preAllocateSegment(estSegSize);
+ long glbIndex = preAllocateSegment(estSegSize, cxt);
SchemaFile.setNodeAddress(child, glbIndex);
}
} else {
@@ -204,34 +347,34 @@ public abstract class PageManager implements IPageManager
{
// prepare buffer to write
childBuffer = RecordUtils.node2Buffer(child);
- actualAddress = getTargetSegmentAddress(curSegAddr, entry.getKey());
- curPage = getPageInstance(SchemaFile.getPageIndex(actualAddress));
+ actualAddress = getTargetSegmentAddress(curSegAddr, entry.getKey(), cxt);
+ curPage = getPageInstance(SchemaFile.getPageIndex(actualAddress), cxt);
try {
curPage
.getAsSegmentedPage()
.write(SchemaFile.getSegIndex(actualAddress), entry.getKey(),
childBuffer);
- markDirty(curPage);
- addPageToCache(curPage.getPageIndex(), curPage);
+ interleavedFlush(curPage, cxt);
+ cxt.markDirty(curPage);
- subIndex = subIndexRootPage(curSegAddr);
+ subIndex = subIndexRootPage(curSegAddr, cxt);
if (alias != null && subIndex >= 0) {
- insertSubIndexEntry(subIndex, alias, entry.getKey());
+ insertSubIndexEntry(subIndex, alias, entry.getKey(), cxt);
}
} catch (SchemaPageOverflowException e) {
if
(curPage.getAsSegmentedPage().getSegmentSize(SchemaFile.getSegIndex(actualAddress))
== SchemaFileConfig.SEG_MAX_SIZ) {
// curPage might be replaced so unnecessary to mark it here
- multiPageInsertOverflowOperation(curPage, entry.getKey(),
childBuffer);
+ multiPageInsertOverflowOperation(curPage, entry.getKey(),
childBuffer, cxt);
- subIndex = subIndexRootPage(curSegAddr);
+ subIndex = subIndexRootPage(curSegAddr, cxt);
if (node.isDevice() && subIndex < 0) {
// the record occurred overflow had been inserted already
- buildSubIndex(node);
+ buildSubIndex(node, cxt);
} else if (alias != null) {
// implied node is entity, so sub index must exist
- insertSubIndexEntry(subIndex, alias, entry.getKey());
+ insertSubIndexEntry(subIndex, alias, entry.getKey(), cxt);
}
} else {
// transplant and delete former segment
@@ -243,27 +386,30 @@ public abstract class PageManager implements IPageManager
{
.getNewChildBuffer()
.entrySet()
.size());
- ISegmentedPage newPage = getMinApplSegmentedPageInMem(newSegSize);
+ ISegmentedPage newPage = getMinApplSegmentedPageInMem(newSegSize,
cxt);
// with single segment, curSegAddr equals actualAddress
curSegAddr =
newPage.transplantSegment(curPage.getAsSegmentedPage(),
actSegId, newSegSize);
newPage.write(SchemaFile.getSegIndex(curSegAddr), entry.getKey(),
childBuffer);
curPage.getAsSegmentedPage().deleteSegment(actSegId);
+
+ // entrant lock guarantees thread-safe
SchemaFile.setNodeAddress(node, curSegAddr);
- updateParentalRecord(node.getParent(), node.getName(), curSegAddr);
- markDirty(curPage);
- addPageToCache(curPage.getPageIndex(), curPage);
+ updateParentalRecord(node.getParent(), node.getName(), curSegAddr,
cxt);
+
+ // newPage is marked and referred within getMinAppl method
+ cxt.markDirty(curPage);
}
}
}
}
- @Override
- public void writeUpdatedChildren(ICachedMNode node) throws
MetadataException, IOException {
+ private void writeUpdatedChildren(ICachedMNode node, SchemaPageContext cxt)
+ throws MetadataException, IOException {
boolean removeOldSubEntry = false, insertNewSubEntry = false;
int subIndex;
- long curSegAddr = SchemaFile.getNodeAddress(node);
+ long curSegAddr = getNodeAddress(node);
long actualAddress; // actual segment to write record
String alias, oldAlias; // key of the sub-index entry now
ICachedMNode child, oldChild;
@@ -272,10 +418,10 @@ public abstract class PageManager implements IPageManager
{
for (Map.Entry<String, ICachedMNode> entry :
ICachedMNodeContainer.getCachedMNodeContainer(node).getUpdatedChildBuffer().entrySet())
{
child = entry.getValue();
- actualAddress = getTargetSegmentAddress(curSegAddr, entry.getKey());
+ actualAddress = getTargetSegmentAddress(curSegAddr, entry.getKey(), cxt);
childBuffer = RecordUtils.node2Buffer(child);
- curPage = getPageInstance(SchemaFile.getPageIndex(actualAddress));
+ curPage = getPageInstance(SchemaFile.getPageIndex(actualAddress), cxt);
if
(curPage.getAsSegmentedPage().read(SchemaFile.getSegIndex(actualAddress),
entry.getKey())
== null) {
throw new MetadataException(
@@ -319,34 +465,34 @@ public abstract class PageManager implements IPageManager
{
curPage
.getAsSegmentedPage()
.update(SchemaFile.getSegIndex(actualAddress), entry.getKey(),
childBuffer);
- markDirty(curPage);
- addPageToCache(curPage.getPageIndex(), curPage);
+ interleavedFlush(curPage, cxt);
+ cxt.markDirty(curPage);
- subIndex = subIndexRootPage(curSegAddr);
+ subIndex = subIndexRootPage(curSegAddr, cxt);
if (subIndex >= 0) {
if (removeOldSubEntry) {
- removeSubIndexEntry(subIndex, oldAlias);
+ removeSubIndexEntry(subIndex, oldAlias, cxt);
}
if (insertNewSubEntry) {
- insertSubIndexEntry(subIndex, alias, entry.getKey());
+ insertSubIndexEntry(subIndex, alias, entry.getKey(), cxt);
}
}
} catch (SchemaPageOverflowException e) {
if
(curPage.getAsSegmentedPage().getSegmentSize(SchemaFile.getSegIndex(actualAddress))
== SchemaFileConfig.SEG_MAX_SIZ) {
- multiPageUpdateOverflowOperation(curPage, entry.getKey(),
childBuffer);
+ multiPageUpdateOverflowOperation(curPage, entry.getKey(),
childBuffer, cxt);
- subIndex = subIndexRootPage(curSegAddr);
+ subIndex = subIndexRootPage(curSegAddr, cxt);
if (node.isDevice() && subIndex < 0) {
- buildSubIndex(node);
+ buildSubIndex(node, cxt);
} else if (insertNewSubEntry || removeOldSubEntry) {
if (removeOldSubEntry) {
- removeSubIndexEntry(subIndex, oldAlias);
+ removeSubIndexEntry(subIndex, oldAlias, cxt);
}
if (insertNewSubEntry) {
- insertSubIndexEntry(subIndex, alias, entry.getKey());
+ insertSubIndexEntry(subIndex, alias, entry.getKey(), cxt);
}
}
} else {
@@ -355,7 +501,7 @@ public abstract class PageManager implements IPageManager {
short newSegSiz =
reEstimateSegSize(
curPage.getAsSegmentedPage().getSegmentSize(actSegId) +
childBuffer.capacity());
- ISegmentedPage newPage = getMinApplSegmentedPageInMem(newSegSiz);
+ ISegmentedPage newPage = getMinApplSegmentedPageInMem(newSegSiz,
cxt);
// assign new segment address
curSegAddr = newPage.transplantSegment(curPage.getAsSegmentedPage(),
actSegId, newSegSiz);
@@ -363,9 +509,8 @@ public abstract class PageManager implements IPageManager {
newPage.update(SchemaFile.getSegIndex(curSegAddr), entry.getKey(),
childBuffer);
SchemaFile.setNodeAddress(node, curSegAddr);
- updateParentalRecord(node.getParent(), node.getName(), curSegAddr);
- markDirty(curPage);
- addPageToCache(curPage.getPageIndex(), curPage);
+ updateParentalRecord(node.getParent(), node.getName(), curSegAddr,
cxt);
+ cxt.markDirty(curPage);
}
}
}
@@ -374,8 +519,8 @@ public abstract class PageManager implements IPageManager {
// endregion
// region Abstract and Overridable Methods
- protected abstract long getTargetSegmentAddress(long curAddr, String recKey)
- throws IOException, MetadataException;
+ protected abstract long getTargetSegmentAddress(
+ long curAddr, String recKey, SchemaPageContext cxt) throws IOException,
MetadataException;
/**
* Deal with split, transplant and index update about the overflow occurred
by insert.
@@ -385,7 +530,7 @@ public abstract class PageManager implements IPageManager {
* @param childBuffer content of the key
*/
protected abstract void multiPageInsertOverflowOperation(
- ISchemaPage curPage, String key, ByteBuffer childBuffer)
+ ISchemaPage curPage, String key, ByteBuffer childBuffer,
SchemaPageContext cxt)
throws MetadataException, IOException;
/**
@@ -394,7 +539,7 @@ public abstract class PageManager implements IPageManager {
* @return the top internal page
*/
protected abstract void multiPageUpdateOverflowOperation(
- ISchemaPage curPage, String key, ByteBuffer childBuffer)
+ ISchemaPage curPage, String key, ByteBuffer childBuffer,
SchemaPageContext cxt)
throws MetadataException, IOException;
/**
@@ -402,7 +547,8 @@ public abstract class PageManager implements IPageManager {
*
* @param parNode node needs to build subordinate index.
*/
- protected abstract void buildSubIndex(ICachedMNode parNode) throws
MetadataException, IOException;
+ protected abstract void buildSubIndex(ICachedMNode parNode,
SchemaPageContext cxt)
+ throws MetadataException, IOException;
/**
* Insert an entry of subordinate index of the target node.
@@ -411,68 +557,127 @@ public abstract class PageManager implements
IPageManager {
* @param key key of the sub-index entry.
* @param rec value of the sub-index entry.
*/
- protected abstract void insertSubIndexEntry(int base, String key, String rec)
+ protected abstract void insertSubIndexEntry(
+ int base, String key, String rec, SchemaPageContext cxt)
throws MetadataException, IOException;
- protected abstract void removeSubIndexEntry(int base, String oldAlias)
+ protected abstract void removeSubIndexEntry(int base, String oldAlias,
SchemaPageContext cxt)
throws MetadataException, IOException;
- protected abstract String searchSubIndexAlias(int base, String alias)
+ protected abstract String searchSubIndexAlias(int base, String alias,
SchemaPageContext cxt)
throws MetadataException, IOException;
// endregion
- // region General Interfaces
- @Override
- public int getLastPageIndex() {
- return lastPageIndex.get();
+ // region Flush Strategy
+ @FunctionalInterface
+ interface FlushPageStrategy {
+ void apply(List<ISchemaPage> dirtyPages) throws IOException;
}
@FunctionalInterface
- interface FlushPageStrategy {
- void apply() throws IOException;
+ interface SinglePageFlushStrategy {
+ void apply(ISchemaPage page) throws IOException;
}
- private void flushDirtyPagesWithLogging() throws IOException {
+ private void flushDirtyPagesWithLogging(List<ISchemaPage> dirtyPages) throws
IOException {
+ if (dirtyPages.size() == 0) {
+ return;
+ }
+
if (logCounter.get() > SchemaFileConfig.SCHEMA_FILE_LOG_SIZE) {
logWriter = logWriter.renew();
logCounter.set(0);
}
logCounter.addAndGet(dirtyPages.size());
- for (ISchemaPage page : dirtyPages.values()) {
+ for (ISchemaPage page : dirtyPages) {
page.syncPageBuffer();
logWriter.write(page);
}
logWriter.prepare();
- for (ISchemaPage page : dirtyPages.values()) {
+ for (ISchemaPage page : dirtyPages) {
page.flushPageToChannel(channel);
}
logWriter.commit();
}
- private void flushDirtyPagesWithoutLogging() throws IOException {
- for (ISchemaPage page : dirtyPages.values()) {
+ private void flushSinglePageWithLogging(ISchemaPage page) throws IOException
{
+ if (logCounter.get() > SchemaFileConfig.SCHEMA_FILE_LOG_SIZE) {
+ logWriter = logWriter.renew();
+ logCounter.set(0);
+ }
+
+ logCounter.addAndGet(1);
+ page.syncPageBuffer();
+ logWriter.write(page);
+ logWriter.prepare();
+ page.flushPageToChannel(channel);
+ logWriter.commit();
+ }
+
+ private void flushDirtyPagesWithoutLogging(List<ISchemaPage> dirtyPages)
throws IOException {
+ for (ISchemaPage page : dirtyPages) {
page.syncPageBuffer();
page.flushPageToChannel(channel);
}
}
- @Override
- public void flushDirtyPages() throws IOException {
- if (dirtyPages.size() == 0) {
+ private void flushSinglePageWithoutLogging(ISchemaPage page) throws
IOException {
+ page.syncPageBuffer();
+ page.flushPageToChannel(channel);
+ }
+
+ public synchronized void flushDirtyPages(SchemaPageContext cxt) throws
IOException {
+ if (cxt.dirtyCnt == 0) {
return;
}
- flushDirtyPagesStrategy.apply();
- dirtyPages.clear();
- Arrays.stream(tieredDirtyPageIndex).forEach(LinkedList::clear);
+ flushDirtyPagesStrategy.apply(
+ cxt.referredPages.values().stream()
+ .filter(ISchemaPage::isDirtyPage)
+ .collect(Collectors.toList()));
+ cxt.appendBucketIndex(pageIndexBuckets);
+ }
+
+ /**
+ * when page prepares to insert, compare it with lastLeaf from context. if
not same, then flush
+ * the lastLeaf, unlock, deref, and remove it from dirtyPages. lastLeafPage
only initiated at
+ * overflowOperation
+ */
+ private void interleavedFlush(ISchemaPage page, SchemaPageContext cxt)
throws IOException {
+ if (cxt.lastLeafPage == null || cxt.lastLeafPage.getPageIndex() ==
page.getPageIndex()) {
+ return;
+ }
+ cxt.interleavedFlushCnt++;
+ singlePageFlushStrategy.apply(cxt.lastLeafPage);
+ // this lastLeaf shall only be lock once
+ cxt.dirtyCnt--;
+
+ // unlock and deref the page from context
+ if (cxt.lockTraces.contains(cxt.lastLeafPage.getPageIndex())) {
+ cxt.lastLeafPage.getLock().writeLock().unlock();
+ cxt.lockTraces.remove(cxt.lastLeafPage.getPageIndex());
+ }
+ cxt.lastLeafPage.getRefCnt().decrementAndGet();
+
+ // can be reclaimed since the page only referred by pageInstCache
+ cxt.referredPages.remove(cxt.lastLeafPage.getPageIndex());
+ // alleviate eviction pressure
+ pageInstCache.remove(cxt.lastLeafPage);
+
+ cxt.lastLeafPage = page.getAsSegmentedPage();
+ }
+ // endregion
+
+ // region General Interfaces
+ @Override
+ public int getLastPageIndex() {
+ return lastPageIndex.get();
}
@Override
public void clear() throws IOException, MetadataException {
- dirtyPages.clear();
- Arrays.stream(tieredDirtyPageIndex).forEach(LinkedList::clear);
pageInstCache.clear();
lastPageIndex.set(0);
logWriter = logWriter == null ? null : logWriter.renew();
@@ -480,9 +685,10 @@ public abstract class PageManager implements IPageManager {
@Override
public void inspect(PrintWriter pw) throws IOException, MetadataException {
+ SchemaPageContext cxt = new SchemaPageContext();
String pageContent;
for (int i = 0; i <= lastPageIndex.get(); i++) {
- pageContent = getPageInstance(i).inspect();
+ pageContent = getPageInstance(i, cxt).inspect();
pw.print("---------------------\n");
pw.print(pageContent);
pw.print("\n");
@@ -500,182 +706,113 @@ public abstract class PageManager implements
IPageManager {
// region Page Access Management
- public ISchemaPage getPageInstance(int pageIdx) throws IOException,
MetadataException {
+ /** Any page returned will be pinned/referred by the cxt. * */
+ public ISchemaPage getPageInstance(int pageIdx, SchemaPageContext cxt)
+ throws IOException, MetadataException {
if (pageIdx > lastPageIndex.get()) {
throw new MetadataException(String.format("Page index %d out of range.",
pageIdx));
}
- pageLocks.readLock(pageIdx);
- try {
- if (dirtyPages.containsKey(pageIdx)) {
- return dirtyPages.get(pageIdx);
- }
-
- if (pageInstCache.containsKey(pageIdx)) {
- return pageInstCache.get(pageIdx);
- }
- } finally {
- pageLocks.readUnlock(pageIdx);
+ // just return from (thread local) context
+ if (cxt != null && cxt.referredPages.containsKey(pageIdx)) {
+ return cxt.referredPages.get(pageIdx);
}
+ // lock for no duplicate page with same index from disk, and guarantees
page will not be evicted
+ // by other thread before referred by current thread
+ cacheLock.lock();
try {
- pageLocks.writeLock(pageIdx);
+ ISchemaPage page = pageInstCache.get(pageIdx);
+ if (page != null) {
+ cxt.refer(page);
+ return page;
+ }
ByteBuffer newBuf = ByteBuffer.allocate(SchemaFileConfig.PAGE_LENGTH);
-
loadFromFile(newBuf, pageIdx);
- return addPageToCache(pageIdx, ISchemaPage.loadSchemaPage(newBuf));
+ page = ISchemaPage.loadSchemaPage(newBuf);
+ cxt.refer(page);
+ addPageToCache(page);
+ return page;
} finally {
- pageLocks.writeUnlock(pageIdx);
+ cacheLock.unlock();
}
}
- private long preAllocateSegment(short size) throws IOException,
MetadataException {
- ISegmentedPage page = getMinApplSegmentedPageInMem(size);
+ private long preAllocateSegment(short size, SchemaPageContext cxt)
+ throws IOException, MetadataException {
+ ISegmentedPage page = getMinApplSegmentedPageInMem(size, cxt);
return SchemaFile.getGlobalIndex(page.getPageIndex(),
page.allocNewSegment(size));
}
- protected ISchemaPage replacePageInCache(ISchemaPage page) {
- markDirty(page);
- return addPageToCache(page.getPageIndex(), page);
+ protected ISchemaPage replacePageInCache(ISchemaPage page, SchemaPageContext
cxt) {
+ // no need to lock since the root of B+Tree is locked
+ cxt.markDirty(page, true);
+ addPageToCache(page);
+ return page;
}
/**
- * Accessing dirtyPages will be expedited, while the retrieval from
pageInstCache remains
- * unaffected due to its limited capacity.
+ * Only accessed during write operation and every page returned is due to be
modified thus they
+ * are all marked dirty.
*
* @param size size of the expected segment
+ * @return always write locked
*/
- protected ISegmentedPage getMinApplSegmentedPageInMem(short size) {
- ISchemaPage targetPage = null;
- int tierLoopCnt = 0;
- for (int i = 0; i < tieredDirtyPageIndex.length && dirtyPages.size() > 0;
i++) {
- tierLoopCnt = tieredDirtyPageIndex[i].size();
- while (size < SchemaFileConfig.SEG_SIZE_LST[i] && tierLoopCnt > 0) {
- targetPage = dirtyPages.get(tieredDirtyPageIndex[i].pop());
- tierLoopCnt--;
-
- // check validity of the retrieved targetPage, as the page type may
have changed,
- // e.g., from SegmentedPage to InternalPage, or index could be stale
- if (targetPage == null || targetPage.getAsSegmentedPage() == null) {
- // invalid index for SegmentedPage, drop the index and get next
- continue;
- }
-
- // suitable page for requested size
- if (targetPage.getAsSegmentedPage().isCapableForSegSize(size)) {
- sortSegmentedIntoIndex(targetPage, size);
- return targetPage.getAsSegmentedPage();
- }
-
- // not large enough but legal for another retrieval
- if
(targetPage.getAsSegmentedPage().isCapableForSegSize(SchemaFileConfig.SEG_SIZE_LST[i]))
{
- tieredDirtyPageIndex[i].add(targetPage.getPageIndex());
- }
- }
+ protected ISegmentedPage getMinApplSegmentedPageInMem(short size,
SchemaPageContext cxt) {
+ // pages retrieved from context is unnecessary and inefficient to lock
+ ISchemaPage targetPage = cxt.indexBuckets.getNearestFitPage(size, false);
+ if (targetPage != null) {
+ cxt.indexBuckets.sortIntoBucket(targetPage, size);
+ return targetPage.getAsSegmentedPage();
}
- // TODO refactor design related to pageInstCache to index its pages in
further development
- for (Map.Entry<Integer, ISchemaPage> entry : pageInstCache.entrySet()) {
- if (entry.getValue().getAsSegmentedPage() != null
- && entry.getValue().getAsSegmentedPage().isCapableForSegSize(size)) {
- markDirty(entry.getValue());
- return pageInstCache.get(entry.getKey()).getAsSegmentedPage();
- }
- }
- return allocateNewSegmentedPage().getAsSegmentedPage();
- }
+ // pageIndexBuckets sorts pages within pageInstCache into buckets to
expedite access
+ targetPage = pageIndexBuckets.getNearestFitPage(size, true);
+ if (targetPage != null) {
+ cxt.markDirty(targetPage);
+ cxt.traceLock(targetPage);
- /**
- * Index SegmentedPage inside {@linkplain #dirtyPages} into tiered list by
{@linkplain
- * SchemaFileConfig#SEG_SIZE_LST}.
- *
- * <p>The level of its index depends on its AVAILABLE space.
- *
- * @param page SegmentedPage to be indexed, no guardian statements since all
entrances are secured
- * for now
- * @param newSegSize to re-integrate after a retrieval, the expected
overhead shall be considered.
- * -1 for common dirty mark.
- */
- protected void sortSegmentedIntoIndex(ISchemaPage page, short newSegSize) {
- // actual space occupied by a segment includes both its own length and the
length of its offset.
- // so available length for a segment is the spareSize minus the offset
length
- short availableSize =
- newSegSize < 0
- ? (short) (page.getAsSegmentedPage().getSpareSize() -
SchemaFileConfig.SEG_OFF_DIG)
- : (short)
- (page.getAsSegmentedPage().getSpareSize()
- - newSegSize
- - SchemaFileConfig.SEG_OFF_DIG);
-
- // too small to index
- if (availableSize < SchemaFileConfig.SEG_HEADER_SIZE) {
- return;
+ // transfer the page from pageIndexBuckets to cxt.buckets thus not be
accessed by other WRITE
+ // thread
+ cxt.indexBuckets.sortIntoBucket(targetPage, size);
+ return targetPage.getAsSegmentedPage();
}
- // index range like: SEG_HEADER_SIZE <= [0] < SEG_SIZE_LST[0], ...
- for (int i = 0; i < SchemaFileConfig.SEG_SIZE_LST.length; i++) {
- // the last of SEG_SIZE_LST is the maximum page size, definitely larger
than others
- if (availableSize < SchemaFileConfig.SEG_SIZE_LST[i]) {
- tieredDirtyPageIndex[i].add(page.getPageIndex());
- return;
- }
- }
+ // due to be dirty thus its index only sorted into local buckets
+ targetPage = allocNewSegmentedPage(cxt);
+ cxt.indexBuckets.sortIntoBucket(targetPage, size);
+ return targetPage.getAsSegmentedPage();
}
- protected synchronized ISchemaPage allocateNewSegmentedPage() {
- lastPageIndex.incrementAndGet();
+ protected ISchemaPage allocNewSegmentedPage(SchemaPageContext cxt) {
ISchemaPage newPage =
ISchemaPage.initSegmentedPage(
- ByteBuffer.allocate(SchemaFileConfig.PAGE_LENGTH),
lastPageIndex.get());
- markDirty(newPage);
- return addPageToCache(newPage.getPageIndex(), newPage);
+ ByteBuffer.allocate(SchemaFileConfig.PAGE_LENGTH),
lastPageIndex.incrementAndGet());
+ // lock and mark to be consistent with # getMinApplSegmentedPageInMem
+ cxt.markDirty(newPage);
+ newPage.getLock().writeLock().lock();
+ cxt.traceLock(newPage);
+ addPageToCache(newPage);
+ return newPage;
}
- protected synchronized ISchemaPage registerAsNewPage(ISchemaPage page) {
+ protected ISchemaPage registerAsNewPage(ISchemaPage page, SchemaPageContext
cxt) {
+ // new page will be LOCAL until the creating thread finished
+ // thus not added to sparsePageIndex now
page.setPageIndex(lastPageIndex.incrementAndGet());
- markDirty(page);
- return addPageToCache(page.getPageIndex(), page);
- }
-
- protected void markDirty(ISchemaPage page) {
- page.markDirty();
- dirtyPages.put(page.getPageIndex(), page);
-
- if (page.getAsSegmentedPage() != null) {
- sortSegmentedIntoIndex(page, (short) -1);
- }
+ addPageToCache(page);
+ cxt.markDirty(page);
+ return page;
}
- protected ISchemaPage addPageToCache(int pageIndex, ISchemaPage page) {
- pageInstCache.put(pageIndex, page);
- // only one thread evicts and flushes pages
- if (evictLock.tryLock()) {
- try {
- if (pageInstCache.size() > SchemaFileConfig.PAGE_CACHE_SIZE) {
- int removeCnt =
- (int) (0.2 * pageInstCache.size()) > 0 ? (int) (0.2 *
pageInstCache.size()) : 1;
- List<Integer> rmvIds = new
ArrayList<>(pageInstCache.keySet()).subList(0, removeCnt);
-
- for (Integer id : rmvIds) {
- // dirty pages only flushed from dirtyPages
- if (pageLocks.findLock(id).writeLock().tryLock()) {
- try {
- pageInstCache.remove(id);
- } finally {
- pageLocks.findLock(id).writeLock().unlock();
- }
- }
- }
- }
- } finally {
- evictLock.unlock();
- }
- }
- return page;
+ protected ISchemaPage addPageToCache(ISchemaPage page) {
+ // size control is left to operation entrance
+ // return value could use to assess whether key already existed
+ return this.pageInstCache.put(page.getPageIndex(), page);
}
- private synchronized int loadFromFile(ByteBuffer dst, int pageIndex) throws
IOException {
+ private int loadFromFile(ByteBuffer dst, int pageIndex) throws IOException {
dst.clear();
if (!readChannel.isOpen()) {
readChannel = FileChannel.open(pmtFile.toPath(),
StandardOpenOption.READ);
@@ -683,24 +820,26 @@ public abstract class PageManager implements IPageManager
{
return readChannel.read(dst, getPageAddress(pageIndex));
}
- private void updateParentalRecord(ICachedMNode parent, String key, long
newSegAddr)
+ private void updateParentalRecord(
+ ICachedMNode parent, String key, long newSegAddr, SchemaPageContext cxt)
throws IOException, MetadataException {
if (parent == null || parent.getChild(key).isDatabase()) {
throw new MetadataException("Root page shall not be migrated.");
}
- long parSegAddr = parent.getParent() == null ? 0L :
SchemaFile.getNodeAddress(parent);
- parSegAddr = getTargetSegmentAddress(parSegAddr, key);
- ISchemaPage page = getPageInstance(SchemaFile.getPageIndex(parSegAddr));
+ long parSegAddr = parent.getParent() == null ? 0L : getNodeAddress(parent);
+ parSegAddr = getTargetSegmentAddress(parSegAddr, key, cxt);
+ ISchemaPage page = getPageInstance(SchemaFile.getPageIndex(parSegAddr),
cxt);
((SegmentedPage)
page).updateRecordSegAddr(SchemaFile.getSegIndex(parSegAddr), key, newSegAddr);
- markDirty(page);
+ cxt.markDirty(page);
}
// endregion
// region Inner Utilities
- private int subIndexRootPage(long addr) throws IOException,
MetadataException {
- return getPageInstance(SchemaFile.getPageIndex(addr)).getSubIndex();
+ private int subIndexRootPage(long addr, SchemaPageContext cxt)
+ throws IOException, MetadataException {
+ return getPageInstance(SchemaFile.getPageIndex(addr), cxt).getSubIndex();
}
private static long getPageAddress(int pageIndex) {
@@ -755,7 +894,7 @@ public abstract class PageManager implements IPageManager {
* here. Supposed to merge with SchemaFile#reEstimateSegSize.
*
* @param expSize expected size calculated from next new record
- * @param batchSize size of children within one {@linkplain
#writeNewChildren(ICachedMNode)}
+ * @param batchSize size of children within one
#writeNewChildren(ICachedMNode)
* @return estimated size
* @throws MetadataException
*/
@@ -798,54 +937,228 @@ public abstract class PageManager implements
IPageManager {
// region TestOnly Methods
+ @TestOnly
+ public String checkAllContexts() {
+ StringBuilder builder = new StringBuilder();
+
+ for (SchemaPageContext cxt : threadContexts.values()) {
+ builder.append(
+ String.format(
+ "cxt[%d] has %d pages, %d lock trace\n",
+ cxt.threadID, cxt.referredPages.size(), cxt.lockTraces.size()));
+ }
+
+ return builder.toString();
+ }
+
+ @TestOnly
+ public static String checkContextLock(SchemaPageContext cxt) {
+ StringBuilder builder = new StringBuilder();
+
+ for (ISchemaPage page : cxt.referredPages.values()) {
+ builder.append(
+ String.format(
+ "page:%d, wl:%d, rl:%d, trace:%b, dirty:%b;\n",
+ page.getPageIndex(),
+ ((ReentrantReadWriteLock) page.getLock()).getWriteHoldCount(),
+ ((ReentrantReadWriteLock) page.getLock()).getReadLockCount(),
+ cxt.lockTraces.contains(page.getPageIndex()),
+ page.isDirtyPage()));
+ }
+ return builder.toString();
+ }
+
@TestOnly
public long getTargetSegmentAddressOnTest(long curSegAddr, String recKey)
throws IOException, MetadataException {
- return getTargetSegmentAddress(curSegAddr, recKey);
+ return getTargetSegmentAddress(curSegAddr, recKey, new
SchemaPageContext());
}
@TestOnly
public ISchemaPage getPageInstanceOnTest(int pageIdx) throws IOException,
MetadataException {
- return getPageInstance(pageIdx);
+ SchemaPageContext cxt = new SchemaPageContext();
+ return getPageInstance(pageIdx, cxt);
}
// endregion
- private static class PageLocks {
- /**
- * number of reentrant read write lock. Notice that this number should be
a prime number for
- * uniform hash
- */
- private static final int NUM_OF_LOCKS = 1039;
+ /** Thread local variables about write/update process. */
+ protected static class SchemaPageContext {
+ final long threadID;
+ final PageIndexSortBuckets indexBuckets;
+ // locked and dirty pages are all referred pages, they all reside in page
cache
+ final Map<Integer, ISchemaPage> referredPages;
+ final Set<Integer> lockTraces;
+ // track B+Tree traversal trace
+ final int[] treeTrace;
+ int dirtyCnt;
+ int interleavedFlushCnt;
+
+ // flush B+Tree leaf before operation finished since all records are
ordered
+ ISegmentedPage lastLeafPage;
+
+ public SchemaPageContext() {
+ threadID = Thread.currentThread().getId();
+ referredPages = new HashMap<>();
+ indexBuckets = new PageIndexSortBuckets(SchemaFileConfig.SEG_SIZE_LST,
referredPages);
+ treeTrace = new int[16];
+ lockTraces = new HashSet<>();
+ lastLeafPage = null;
+ dirtyCnt = 0;
+ interleavedFlushCnt = 0;
+ }
+
+ public void markDirty(ISchemaPage page) {
+ markDirty(page, false);
+ }
- /** locks array */
- private final ReentrantReadWriteLock[] locks;
+ private void markDirty(ISchemaPage page, boolean forceReplace) {
+ if (!page.isDirtyPage()) {
+ dirtyCnt++;
+ }
+ page.setDirtyFlag();
+ refer(page);
+ if (forceReplace && referredPages.containsKey(page.getPageIndex())) {
- protected PageLocks() {
- locks = new ReentrantReadWriteLock[NUM_OF_LOCKS];
- for (int i = 0; i < NUM_OF_LOCKS; i++) {
- locks[i] = new ReentrantReadWriteLock();
+ // previous page is dirty, so it's not a new dirty page
+ if (referredPages.get(page.getPageIndex()).isDirtyPage()) {
+ dirtyCnt--;
+ }
+ // force to replace
+ referredPages.put(page.getPageIndex(), page);
}
}
- public void readLock(int hash) {
- findLock(hash).readLock().lock();
+ private void traceLock(ISchemaPage page) {
+ refer(page);
+ lockTraces.add(page.getPageIndex());
}
- public void readUnlock(int hash) {
- findLock(hash).readLock().unlock();
+ // referred pages will not be evicted until operation finished
+ private void refer(ISchemaPage page) {
+ if (!referredPages.containsKey(page.getPageIndex())) {
+ page.getRefCnt().incrementAndGet();
+ referredPages.put(page.getPageIndex(), page);
+ }
+ }
+
+ /**
+ * Since records are ordered for write operation, it is reasonable to
flush those left siblings
+ * of the active leaf page. The target page would be initiated at the
first split within the
+ * operation.
+ *
+ * @param page left leaf of the split
+ */
+ public void invokeLastLeaf(ISchemaPage page) {
+ // only record at the first split
+ if (lastLeafPage == null) {
+ lastLeafPage = page.getAsSegmentedPage();
+ }
}
- public void writeLock(int hash) {
- findLock(hash).writeLock().lock();
+ private void appendBucketIndex(PageIndexSortBuckets pisb) {
+ for (int i = 0; i < indexBuckets.buckets.length; i++) {
+ pisb.buckets[i].addAll(indexBuckets.buckets[i]);
+ }
}
+ }
- public void writeUnlock(int hash) {
- findLock(hash).writeLock().unlock();
+ /**
+ * * Index buckets affiliated to a page collection. Indexes are sort into
buckets according to
+ * spare space of its corresponding page instance.
+ */
+ protected static class PageIndexSortBuckets {
+ private final short[] bounds;
+ private final LinkedList<Integer>[] buckets;
+ private final Map<Integer, ISchemaPage> pageContainer;
+
+ public PageIndexSortBuckets(short[] borders, Map<Integer, ISchemaPage>
container) {
+ bounds = Arrays.copyOf(borders, borders.length);
+ buckets = new LinkedList[borders.length];
+ pageContainer = container;
+ for (int i = 0; i < borders.length; i++) {
+ buckets[i] = new LinkedList<>();
+ }
}
- private ReentrantReadWriteLock findLock(int hash) {
- return locks[hash % NUM_OF_LOCKS];
+ public void clear() {
+ for (Queue<Integer> q : buckets) {
+ q.clear();
+ }
+ }
+
+ public void sortIntoBucket(ISchemaPage page, short newSegSize) {
+ if (page.getAsSegmentedPage() == null) {
+ return;
+ }
+
+ // actual space occupied by a segment includes both its own length and
the length of its
+ // offset. so available length for a segment is the spareSize minus the
offset bytes
+ short availableSize =
+ newSegSize < 0
+ ? (short) (page.getAsSegmentedPage().getSpareSize() -
SchemaFileConfig.SEG_OFF_DIG)
+ : (short)
+ (page.getAsSegmentedPage().getSpareSize()
+ - newSegSize
+ - SchemaFileConfig.SEG_OFF_DIG);
+
+ // too small to index
+ if (availableSize <= SchemaFileConfig.SEG_HEADER_SIZE) {
+ return;
+ }
+
+ // be like: SEG_HEADER < buckets[0] <= bounds[0] < buckets[1] <= ...
+ for (int i = 0; i < bounds.length; i++) {
+ // the last of SEG_SIZE_LST is the maximum page size, definitely
larger than others
+ if (availableSize <= bounds[i]) {
+ buckets[i].add(page.getPageIndex());
+ return;
+ }
+ }
+ }
+
+ /**
+ * @param withLock set if page container is a global/shared object
+ * @return the page index will be removed from the bucket.
+ */
+ public synchronized ISchemaPage getNearestFitPage(short size, boolean
withLock) {
+ ISchemaPage targetPage;
+ int elemToCheck;
+ for (int i = 0; i < buckets.length && pageContainer.size() > 0; i++) {
+ // buckets[i] stores pages with spare space less than bounds[i]
+ elemToCheck = buckets[i].size();
+ while (size < bounds[i] && elemToCheck > 0) {
+ // find roughly fit page
+ targetPage = pageContainer.getOrDefault(buckets[i].poll(), null);
+ elemToCheck--;
+
+ if (targetPage == null || targetPage.getAsSegmentedPage() == null) {
+ // act as lazy remove on buckets
+ continue;
+ }
+
+ // seek in global container thus other page could be read locked
+ if (withLock
+ && targetPage.getAsSegmentedPage().isCapableForSegSize(size)
+ && targetPage.getLock().writeLock().tryLock()) {
+ if (targetPage.getAsSegmentedPage().isCapableForSegSize(size)) {
+ return targetPage.getAsSegmentedPage();
+ }
+ targetPage.getLock().writeLock().unlock();
+ }
+
+ // only in local dirty pages which are always write locked by itself
+ if (!withLock &&
targetPage.getAsSegmentedPage().isCapableForSegSize(size)) {
+ return targetPage;
+ }
+
+ // not large as expected, fit into suitable bucket
+ if (i > 0 &&
targetPage.getAsSegmentedPage().isCapableForSegSize(bounds[0])) {
+ sortIntoBucket(targetPage, (short) -1);
+ }
+ }
+ }
+ return null;
}
}
}
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/metadata/mtree/schemafile/SchemaFileLogTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/metadata/mtree/schemafile/SchemaFileLogTest.java
index 71baa0cdc47..83f72b8e5f3 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/metadata/mtree/schemafile/SchemaFileLogTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/metadata/mtree/schemafile/SchemaFileLogTest.java
@@ -135,7 +135,9 @@ public class SchemaFileLogTest {
sf.getChildren(lastNode);
fail();
} catch (Exception e) {
- Assert.assertEquals("Segment(index:0) not found in page(index:2).",
e.getMessage());
+ Assert.assertEquals(
+ String.format("Segment(index:0) not found in page(index:%d).",
corruptPageIndex),
+ e.getMessage());
} finally {
sf.close();
}
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/metadata/mtree/schemafile/SchemaFileTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/metadata/mtree/schemafile/SchemaFileTest.java
index 987185b9258..86490650270 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/metadata/mtree/schemafile/SchemaFileTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/metadata/mtree/schemafile/SchemaFileTest.java
@@ -230,7 +230,6 @@ public class SchemaFileTest {
nsf.writeMNode(vt4);
Assert.assertEquals(11111L, nsf.init().getAsDatabaseMNode().getDataTTL());
-
nsf.close();
}