This is an automated email from the ASF dual-hosted git repository.
zyk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 0144133481c PBTree Improvement: WrappedSegment Layout Revision for
Reduced GC Overhead (#11652)
0144133481c is described below
commit 0144133481ccd95b29673f8dbde2ac65fa7f79a1
Author: ZhaoXin <[email protected]>
AuthorDate: Tue Dec 5 10:21:54 2023 +0800
PBTree Improvement: WrappedSegment Layout Revision for Reduced GC Overhead
(#11652)
---
.../mtree/impl/pbtree/schemafile/InternalPage.java | 18 +-
.../mtree/impl/pbtree/schemafile/SchemaPage.java | 2 +-
.../mtree/impl/pbtree/schemafile/Segment.java | 478 --------------
.../impl/pbtree/schemafile/WrappedSegment.java | 708 +++++++++++++++++----
4 files changed, 584 insertions(+), 622 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/InternalPage.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/InternalPage.java
index 6e82448b58c..724e97f67bf 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/InternalPage.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/InternalPage.java
@@ -39,7 +39,7 @@ public class InternalPage extends SchemaPage implements
ISegment<Integer, Intege
private long firstLeaf;
private int subIndexPage;
- private transient String penultKey, lastKey;
+ private String penultKey, lastKey;
/**
* <b>Compound pointers will not be deserialized as any Java Objects since
it may contains massive
@@ -584,19 +584,17 @@ public class InternalPage extends SchemaPage implements
ISegment<Integer, Intege
}
}
- private String getKeyByOffset(short offset) {
- synchronized (this.pageBuffer) {
- this.pageBuffer.limit(this.pageBuffer.capacity());
- this.pageBuffer.position(offset);
- return ReadWriteIOUtils.readString(this.pageBuffer);
- }
- }
-
private String getKeyByIndex(int index) {
if (index <= 0 || index >= memberNum) {
throw new IndexOutOfBoundsException();
}
- return getKeyByOffset(keyOffset(getPointerByIndex(index)));
+ synchronized (pageBuffer) {
+ this.pageBuffer.limit(this.pageBuffer.capacity());
+ this.pageBuffer.position(SchemaFileConfig.PAGE_HEADER_SIZE + index *
COMPOUND_POINT_LENGTH);
+ short ofs = (short) (this.pageBuffer.getLong() &
SchemaFileConfig.COMP_PTR_OFFSET_MASK);
+ this.pageBuffer.position(ofs);
+ return ReadWriteIOUtils.readString(this.pageBuffer);
+ }
}
// endregion
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/SchemaPage.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/SchemaPage.java
index b9d325df8ce..d72c80b6da3 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/SchemaPage.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/SchemaPage.java
@@ -42,7 +42,7 @@ public abstract class SchemaPage implements ISchemaPage {
protected short spareSize; // traces spare space size simultaneously
protected short memberNum; // amount of the member, definition depends on
implementation
- protected volatile boolean dirtyFlag = false; // any modification turns it
true
+ protected boolean dirtyFlag = false; // any modification turns it true
protected SchemaPage(ByteBuffer pageBuffer) {
this.pageBuffer = pageBuffer;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/Segment.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/Segment.java
deleted file mode 100644
index 09c3f09c8ac..00000000000
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/Segment.java
+++ /dev/null
@@ -1,478 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.schemafile;
-
-import org.apache.iotdb.commons.exception.MetadataException;
-import
org.apache.iotdb.db.exception.metadata.schemafile.ColossalRecordException;
-import
org.apache.iotdb.db.exception.metadata.schemafile.RecordDuplicatedException;
-import org.apache.iotdb.tsfile.utils.Pair;
-import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
-
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.List;
-
-import static
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.schemafile.SchemaFileConfig.SEG_HEADER_SIZE;
-
-/**
- * Segments store [String]keys with bytebuffer records, which can be
serialized IMNode, bytes of
- * alias string or any other attributes need to store with hierarchy structure.
- *
- * @param <R> type to construct from bytebuffer
- */
-public abstract class Segment<R> implements ISegment<ByteBuffer, R> {
-
- // members load from buffer
- protected final ByteBuffer buffer;
- protected short length, freeAddr, recordNum, pairLength;
- protected boolean delFlag;
- protected long prevSegAddress, nextSegAddress;
-
- // reconstruct from key-address pair buffer
- protected List<Pair<String, Short>> keyAddressList;
-
- // assess monotonic
- protected String penuKey = null, lastKey = null;
-
- /**
- * Init Segment with a buffer, which contains all information about this
segment
- *
- * <p>For a page no more than 16 kib, a signed short is enough to index all
bytes inside a
- * segment.
- *
- * <p><b>Segment Structure:</b>
- * <li>25 byte: header
- * <li>1 short: length, segment length
- * <li>1 short: freeAddr, start offset of records
- * <li>1 short: recordNum, amount of records in this segment
- * <li>1 short: pairLength, length of key-address in bytes
- * <li>1 long (8 bytes): prevSegIndex, previous segment index
- * <li>1 long (8 bytes): nextSegIndex, next segment index
- * <li>1 bit: delFlag, delete flag <br>
- * (--- checksum, parent record address, max/min record key may be
contained further ---)
- * <li>var length: key-address pairs, begin at 25 bytes offset, length of
pairLength <br>
- * ... empty space ...
- * <li>var length: records
- */
- protected Segment(ByteBuffer buffer, boolean override) {
- this.buffer = buffer;
-
- this.buffer.clear();
- if (override) {
- // blank segment
- length = (short) buffer.capacity();
- freeAddr = (short) buffer.capacity();
- recordNum = 0;
- pairLength = 0;
-
- // these two address need to be initiated as same as in childrenContainer
- prevSegAddress = -1;
- nextSegAddress = -1;
- delFlag = false;
- // parRecord = lastSegAddr = nextSegAddr = 0L;
-
- keyAddressList = new ArrayList<>();
- } else {
- length = ReadWriteIOUtils.readShort(buffer);
- freeAddr = ReadWriteIOUtils.readShort(buffer);
- recordNum = ReadWriteIOUtils.readShort(buffer);
- pairLength = ReadWriteIOUtils.readShort(buffer);
-
- // parRecord = ReadWriteIOUtils.readLong(buffer);
- prevSegAddress = ReadWriteIOUtils.readLong(buffer);
- nextSegAddress = ReadWriteIOUtils.readLong(buffer);
- delFlag = ReadWriteIOUtils.readBool(buffer);
-
- buffer.position(SEG_HEADER_SIZE);
- buffer.limit(SEG_HEADER_SIZE + pairLength);
- ByteBuffer pairBuffer = buffer.slice();
- buffer.clear(); // reconstruction finished, reset buffer position and
limit
- reconstructKeyAddress(pairBuffer);
- }
- }
-
- private void reconstructKeyAddress(ByteBuffer pairBuffer) {
- keyAddressList = new ArrayList<>();
- for (int idx = 0; idx < recordNum; idx++) {
- String key = ReadWriteIOUtils.readString(pairBuffer);
- Short address = ReadWriteIOUtils.readShort(pairBuffer);
- keyAddressList.add(new Pair<>(key, address));
- }
- }
-
- @Override
- public boolean hasRecordKey(String key) {
- return binarySearchPairList(keyAddressList, key) > -1;
- }
-
- @Override
- public boolean hasRecordAlias(String alias) {
- return false;
- }
-
- @Override
- public void syncBuffer() {
- ByteBuffer prefBuffer = ByteBuffer.allocate(SEG_HEADER_SIZE + pairLength);
-
- ReadWriteIOUtils.write(length, prefBuffer);
- ReadWriteIOUtils.write(freeAddr, prefBuffer);
- ReadWriteIOUtils.write(recordNum, prefBuffer);
- ReadWriteIOUtils.write(pairLength, prefBuffer);
- ReadWriteIOUtils.write(prevSegAddress, prefBuffer);
- ReadWriteIOUtils.write(nextSegAddress, prefBuffer);
- ReadWriteIOUtils.write(delFlag, prefBuffer);
-
- prefBuffer.position(SEG_HEADER_SIZE);
-
- for (Pair<String, Short> pair : keyAddressList) {
- ReadWriteIOUtils.write(pair.left, prefBuffer);
- ReadWriteIOUtils.write(pair.right, prefBuffer);
- }
-
- prefBuffer.clear();
- this.buffer.clear();
- this.buffer.put(prefBuffer);
- }
-
- @Override
- public short size() {
- return length;
- }
-
- @Override
- public short getSpareSize() {
- return (short) (freeAddr - pairLength - SEG_HEADER_SIZE);
- }
-
- @Override
- public void delete() {
- this.delFlag = true;
- this.buffer.clear();
- this.buffer.position(SchemaFileConfig.SEG_HEADER_SIZE - 1);
- ReadWriteIOUtils.write(true, this.buffer);
- }
-
- @Override
- public long getNextSegAddress() {
- return nextSegAddress;
- }
-
- @Override
- public void setNextSegAddress(long nextSegAddress) {
- this.nextSegAddress = nextSegAddress;
- }
-
- @Override
- public void extendsTo(ByteBuffer newBuffer) throws MetadataException {
- short sizeGap = (short) (newBuffer.capacity() - length);
-
- if (sizeGap < 0) {
- throw new MetadataException("Leaf Segment cannot extend to a smaller
buffer.");
- }
-
- this.buffer.clear();
- newBuffer.clear();
-
- if (sizeGap == 0) {
- this.syncBuffer();
- this.buffer.clear();
- newBuffer.put(this.buffer);
- this.buffer.clear();
- newBuffer.clear();
- return;
- }
-
- ReadWriteIOUtils.write((short) newBuffer.capacity(), newBuffer);
- ReadWriteIOUtils.write((short) (freeAddr + sizeGap), newBuffer);
- ReadWriteIOUtils.write(recordNum, newBuffer);
- ReadWriteIOUtils.write(pairLength, newBuffer);
- ReadWriteIOUtils.write(prevSegAddress, newBuffer);
- ReadWriteIOUtils.write(nextSegAddress, newBuffer);
- ReadWriteIOUtils.write(delFlag, newBuffer);
-
- newBuffer.position(SchemaFileConfig.SEG_HEADER_SIZE);
- for (Pair<String, Short> pair : keyAddressList) {
- ReadWriteIOUtils.write(pair.left, newBuffer);
- ReadWriteIOUtils.write((short) (pair.right + sizeGap), newBuffer);
- }
-
- this.buffer.clear();
- this.buffer.position(freeAddr);
- this.buffer.limit(length);
- newBuffer.position(freeAddr + sizeGap);
- newBuffer.put(this.buffer);
- newBuffer.clear();
- this.buffer.clear();
- }
-
- @Override
- public ByteBuffer resetBuffer(int ptr) {
- freeAddr = (short) this.buffer.capacity();
- recordNum = 0;
- pairLength = 0;
- prevSegAddress = -1;
- nextSegAddress = -1;
- keyAddressList.clear();
- syncBuffer();
- this.buffer.clear();
- return this.buffer.slice();
- }
-
- @Override
- public String splitByKey(
- String key, ByteBuffer recBuf, ByteBuffer dstBuffer, boolean
inclineSplit)
- throws MetadataException {
-
- if (this.buffer.capacity() != dstBuffer.capacity()) {
- throw new MetadataException("Segments only splits with same capacity.");
- }
-
- if (keyAddressList.size() == 0) {
- throw new MetadataException("Segment can not be split with no records.");
- }
-
- if (key == null && keyAddressList.size() == 1) {
- throw new MetadataException("Segment can not be split with only one
record.");
- }
-
- // notice that key can be null here
- boolean monotonic =
- penuKey != null
- && key != null
- && lastKey != null
- && inclineSplit
- && (key.compareTo(lastKey)) * (lastKey.compareTo(penuKey)) > 0;
-
- int n = keyAddressList.size();
-
- // actual index of key just smaller than the insert, -2 for null key
- int pos = key != null ? binaryInsertPairList(keyAddressList, key) - 1 : -2;
-
- int sp; // virtual index to split
- if (monotonic) {
- // new entry into part with more space
- sp = key.compareTo(lastKey) > 0 ? Math.max(pos + 1, n / 2) :
Math.min(pos + 2, n / 2);
- } else {
- sp = n / 2;
- }
-
- // little different from InternalSegment, only the front edge key can not
split
- sp = sp <= 0 ? 1 : sp;
-
- // prepare header for dstBuffer
- short length = this.length,
- freeAddr = (short) dstBuffer.capacity(),
- recordNum = 0,
- pairLength = 0;
- boolean delFlag = false;
- long prevSegAddress = this.prevSegAddress, nextSegAddress =
this.nextSegAddress;
-
- int recSize, keySize;
- String mKey, sKey = null;
- ByteBuffer srcBuf;
- int aix; // aix for actual index on keyAddressList
- n = key == null ? n - 1 : n; // null key
- // TODO: implement bulk split further
- for (int ix = sp; ix <= n; ix++) {
- if (ix == pos + 1) {
- // migrate newly insert
- srcBuf = recBuf;
- recSize = recBuf.capacity();
- mKey = key;
- keySize = 4 + mKey.getBytes().length;
-
- recBuf.clear();
- } else {
- srcBuf = this.buffer;
- // pos equals -2 if key is null
- aix = (ix > pos) && (pos != -2) ? ix - 1 : ix;
- mKey = keyAddressList.get(aix).left;
- keySize = 4 + mKey.getBytes().length;
-
- // prepare on this.buffer
- this.buffer.clear();
- this.buffer.position(keyAddressList.get(aix).right);
- recSize = getRecordLength();
- this.buffer.limit(this.buffer.position() + recSize);
-
- this.recordNum--;
- }
-
- if (ix == sp) {
- // search key is the first key in split segment
- sKey = mKey;
- }
-
- freeAddr -= recSize;
- dstBuffer.position(freeAddr);
- dstBuffer.put(srcBuf);
-
- dstBuffer.position(SchemaFileConfig.SEG_HEADER_SIZE + pairLength);
- ReadWriteIOUtils.write(mKey, dstBuffer);
- ReadWriteIOUtils.write(freeAddr, dstBuffer);
-
- recordNum++;
- pairLength += keySize + 2;
- }
-
- // compact and update status
- this.keyAddressList = this.keyAddressList.subList(0, this.recordNum);
- compactRecords();
- if (sp > pos + 1 && key != null) {
- // new insert shall be in this
- if (insertRecord(key, recBuf) < 0) {
- throw new ColossalRecordException(key, recBuf.capacity());
- }
- }
-
- // flush dstBuffer header
- dstBuffer.clear();
- ReadWriteIOUtils.write(length, dstBuffer);
- ReadWriteIOUtils.write(freeAddr, dstBuffer);
- ReadWriteIOUtils.write(recordNum, dstBuffer);
- ReadWriteIOUtils.write(pairLength, dstBuffer);
- ReadWriteIOUtils.write(prevSegAddress, dstBuffer);
- ReadWriteIOUtils.write(nextSegAddress, dstBuffer);
- ReadWriteIOUtils.write(delFlag, dstBuffer);
-
- penuKey = null;
- lastKey = null;
- return sKey;
- }
-
- protected void compactRecords() {
- // compact by existed item on keyAddressList
- ByteBuffer tempBuf = ByteBuffer.allocate(this.buffer.capacity() -
this.freeAddr);
- int accSiz = 0;
- this.pairLength = 0;
- for (Pair<String, Short> pair : keyAddressList) {
- this.pairLength += pair.left.getBytes().length + 4 + 2;
- this.buffer.clear();
- this.buffer.position(pair.right);
- this.buffer.limit(pair.right + getRecordLength());
-
- accSiz += this.buffer.remaining();
- pair.right = (short) (this.buffer.capacity() - accSiz);
-
- tempBuf.position(tempBuf.capacity() - accSiz);
- tempBuf.put(this.buffer);
- }
- tempBuf.clear();
- tempBuf.position(tempBuf.capacity() - accSiz);
- this.freeAddr = (short) (this.buffer.capacity() - accSiz);
-
- this.buffer.clear();
- this.buffer.position(this.freeAddr);
- this.buffer.put(tempBuf);
-
- this.syncBuffer();
- }
-
- /** Assuming that buffer has been set position well, record length depends
on implementation. */
- protected abstract short getRecordLength();
-
- protected static <T> int binarySearchPairList(List<Pair<String, T>> list,
String key) {
- int head = 0;
- int tail = list.size() - 1;
- if (tail < 0
- || key.compareTo(list.get(head).left) < 0
- || key.compareTo(list.get(tail).left) > 0) {
- return -1;
- }
- if (key.compareTo(list.get(head).left) == 0) {
- return head;
- }
- if (key.compareTo(list.get(tail).left) == 0) {
- return tail;
- }
- int pivot = (head + tail) / 2;
- while (key.compareTo(list.get(pivot).left) != 0) {
- if (head == tail || pivot == head || pivot == tail) {
- return -1;
- }
- if (key.compareTo(list.get(pivot).left) < 0) {
- tail = pivot;
- } else if (key.compareTo(list.get(pivot).left) > 0) {
- head = pivot;
- }
- pivot = (head + tail) / 2;
- }
- return pivot;
- }
-
- /**
- * @return target index the record with passing in key should be inserted
- * @throws RecordDuplicatedException
- */
- protected static <T> int binaryInsertPairList(List<Pair<String, T>> list,
String key)
- throws RecordDuplicatedException {
- if (list.isEmpty()) {
- return 0;
- }
-
- int tarIdx = 0;
- int head = 0;
- int tail = list.size() - 1;
-
- if (list.get(head).left.compareTo(key) == 0 ||
list.get(tail).left.compareTo(key) == 0) {
- throw new RecordDuplicatedException(key);
- }
-
- if (key.compareTo(list.get(head).left) < 0) {
- return 0;
- }
-
- if (key.compareTo(list.get(tail).left) > 0) {
- return list.size();
- }
-
- int pivot;
- while (head != tail) {
- pivot = (head + tail) / 2;
- // notice pivot always smaller than list.size()-1
- if (list.get(pivot).left.compareTo(key) == 0
- || list.get(pivot + 1).left.compareTo(key) == 0) {
- throw new RecordDuplicatedException(key);
- }
-
- if (list.get(pivot).left.compareTo(key) < 0 && list.get(pivot +
1).left.compareTo(key) > 0) {
- return pivot + 1;
- }
-
- if (pivot == head || pivot == tail) {
- if (list.get(head).left.compareTo(key) > 0) {
- return head;
- }
- if (list.get(tail).left.compareTo(key) < 0) {
- return tail + 1;
- }
- }
-
- // impossible for pivot.cmp > 0 and (pivot+1).cmp < 0
- if (list.get(pivot).left.compareTo(key) > 0) {
- tail = pivot;
- }
-
- if (list.get(pivot + 1).left.compareTo(key) < 0) {
- head = pivot;
- }
- }
- return tarIdx;
- }
-}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/WrappedSegment.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/WrappedSegment.java
index a5042fa67f6..c535a811a05 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/WrappedSegment.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/WrappedSegment.java
@@ -20,6 +20,7 @@ package
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.schemafi
import org.apache.iotdb.commons.exception.MetadataException;
import org.apache.iotdb.commons.utils.TestOnly;
+import
org.apache.iotdb.db.exception.metadata.schemafile.ColossalRecordException;
import
org.apache.iotdb.db.exception.metadata.schemafile.RecordDuplicatedException;
import
org.apache.iotdb.db.exception.metadata.schemafile.SegmentOverflowException;
import
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.mnode.ICachedMNode;
@@ -31,49 +32,75 @@ import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
import java.nio.BufferUnderflowException;
import java.nio.ByteBuffer;
+import java.nio.ShortBuffer;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.List;
import java.util.Queue;
+import static
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.schemafile.SchemaFileConfig.SEG_HEADER_SIZE;
+
/**
- * This class initiate a segment object with corresponding bytes. Implements
add, get, remove
- * records which is serialized ICacheMNode. <br>
+ * The physical instance that stores buffer of IMNodes. <br>
* Act like a wrapper of a bytebuffer which reflects a segment. <br>
* And itself is wrapped inside a SchemaPage.
*/
-public class WrappedSegment extends Segment<ICachedMNode> {
+public class WrappedSegment implements ISegment<ByteBuffer, ICachedMNode> {
+
+ // members load from buffer
+ protected final ByteBuffer buffer;
+ protected short length, freeAddr, recordNum, pairLength;
+ protected boolean delFlag, aliasFlag;
+ protected long prevSegAddress, nextSegAddress;
- // reconstruct every initiation after keyAddressList but not write into
buffer
- private List<Pair<String, String>> aliasKeyList;
+ // assess monotonic
+ protected String penuKey = null, lastKey = null;
/**
- * Init Segment with a buffer, which contains all information about this
segment
- *
- * <p>For a page no more than 16 kib, a signed short is enough to index all
bytes inside a
- * segment.
- *
- * <p><b>Segment Structure:</b>
- * <li>25 byte: header
+ * <b>Segment Structure:</b>
+ * <li>(25 byte: header)
* <li>1 short: length, segment length
* <li>1 short: freeAddr, start offset of records
* <li>1 short: recordNum, amount of records in this segment
* <li>1 short: pairLength, length of key-address in bytes
* <li>1 long (8 bytes): prevSegIndex, previous segment index
* <li>1 long (8 bytes): nextSegIndex, next segment index
- * <li>1 bit: delFlag, delete flag <br>
+ * <li>1 bit: delFlag, delete flag
+ * <li>1 bit: aliasFlag, whether alias existed<br>
* (--- checksum, parent record address, max/min record key may be
contained further ---)
- * <li>var length: key-address pairs, begin at 25 bytes offset, length of
pairLength <br>
+ * <li><s>var length: key-address pairs, begin at 25 bytes offset, length of
pairLength </s><br>
+ * <li>var length: ORDERED record offset<br>
* ... empty space ...
- * <li>var length: records
+ * <li>var length: (record key, record body) * recordNum
*/
- public WrappedSegment(ByteBuffer buffer, boolean override) throws
RecordDuplicatedException {
- super(buffer, override);
+ public WrappedSegment(ByteBuffer buffer, boolean override) {
+ this.buffer = buffer;
+ this.buffer.clear();
if (override) {
- aliasKeyList = new ArrayList<>();
+ // blank segment
+ length = (short) buffer.capacity();
+ freeAddr = (short) buffer.capacity();
+ recordNum = 0;
+ pairLength = 0;
+
+ // these two address need to be initiated as same as in childrenContainer
+ prevSegAddress = -1;
+ nextSegAddress = -1;
+ delFlag = false;
+ aliasFlag = false;
+
} else {
- reconstructAliasAddressList();
+ length = ReadWriteIOUtils.readShort(buffer);
+ freeAddr = ReadWriteIOUtils.readShort(buffer);
+ recordNum = ReadWriteIOUtils.readShort(buffer);
+ pairLength = ReadWriteIOUtils.readShort(buffer);
+
+ prevSegAddress = ReadWriteIOUtils.readLong(buffer);
+ nextSegAddress = ReadWriteIOUtils.readLong(buffer);
+ byte flags = ReadWriteIOUtils.readByte(buffer);
+ delFlag = (0x80 & flags) != 0;
+ aliasFlag = (0x40 & flags) != 0;
}
}
@@ -102,54 +129,120 @@ public class WrappedSegment extends
Segment<ICachedMNode> {
return new WrappedSegment(buffer, false);
}
- private void reconstructAliasAddressList() throws RecordDuplicatedException {
- if (aliasKeyList != null) {
- aliasKeyList.clear();
- } else {
- aliasKeyList = new ArrayList<>();
- }
+ // region Interface Implementation
- ByteBuffer bufferR = this.buffer.asReadOnlyBuffer();
- bufferR.clear();
- for (Pair<String, Short> p : keyAddressList) {
- if (p.right >= 0) {
- bufferR.position(p.right);
- String alias = RecordUtils.getRecordAlias(bufferR);
- if (alias != null) {
- aliasKeyList.add(binaryInsertPairList(aliasKeyList, alias), new
Pair<>(alias, p.left));
- }
- }
- }
+ @Override
+ public boolean hasRecordKey(String key) {
+ return binarySearchOnKeys(key) > -1;
}
- // region Interface Implementation
+ @Override
+ public boolean hasRecordAlias(String alias) {
+ return getIndexByAlias(alias) > -1;
+ }
+
+ @Override
+ public synchronized void syncBuffer() {
+ ByteBuffer prefBuffer = ByteBuffer.allocate(SEG_HEADER_SIZE);
+
+ ReadWriteIOUtils.write(length, prefBuffer);
+ ReadWriteIOUtils.write(freeAddr, prefBuffer);
+ ReadWriteIOUtils.write(recordNum, prefBuffer);
+ ReadWriteIOUtils.write(pairLength, prefBuffer);
+ ReadWriteIOUtils.write(prevSegAddress, prefBuffer);
+ ReadWriteIOUtils.write(nextSegAddress, prefBuffer);
+ ReadWriteIOUtils.write(getFlag(), prefBuffer);
+
+ prefBuffer.clear();
+ this.buffer.clear();
+ this.buffer.put(prefBuffer);
+ }
+
+ private byte getFlag() {
+ byte flags = delFlag ? (byte) 0x80 : (byte) 0x00;
+ flags = (byte) (aliasFlag ? flags | 0x40 : flags | 0x00);
+ return flags;
+ }
+
+ @Override
+ public short size() {
+ return length;
+ }
+
+ @Override
+ public short getSpareSize() {
+ return (short) (freeAddr - pairLength - SEG_HEADER_SIZE);
+ }
+
+ @Override
+ public void delete() {
+ this.delFlag = true;
+ this.buffer.clear();
+ this.buffer.position(SchemaFileConfig.SEG_HEADER_SIZE - 1);
+ ReadWriteIOUtils.write(getFlag(), this.buffer);
+ }
+
+ @Override
+ public long getNextSegAddress() {
+ return nextSegAddress;
+ }
+
+ @Override
+ public void setNextSegAddress(long nextSegAddress) {
+ this.nextSegAddress = nextSegAddress;
+ }
@Override
public synchronized int insertRecord(String key, ByteBuffer buf)
throws RecordDuplicatedException {
buf.clear();
- int recordStartAddr = freeAddr - buf.capacity();
+ // key and body store adjacently
+ byte[] ikBytes = key.getBytes();
+ int recordStartAddr = freeAddr - buf.capacity() - 4 - ikBytes.length;
- int newPairLength = pairLength + key.getBytes().length + 4 + 2;
+ int newPairLength = pairLength + 2;
if (recordStartAddr < SchemaFileConfig.SEG_HEADER_SIZE + newPairLength) {
return -1;
}
pairLength = (short) newPairLength;
- int tarIdx = binaryInsertPairList(keyAddressList, key);
+ int tarIdx = binaryInsertOnKeys(key);
- // update aliasKeyList
+ // fixme EXPENSIVE cross check of duplication between name and alias
+ if (aliasFlag && getIndexByAlias(key) != -1) {
+ throw new RecordDuplicatedException(
+ String.format("Record [%s] has conflict name with alias of its
siblings.", key));
+ }
+
+ // check alias-key duplication, set flag if necessary
String alias = RecordUtils.getRecordAlias(buf);
if (alias != null && !alias.equals("")) {
- aliasKeyList.add(binaryInsertPairList(aliasKeyList, alias), new
Pair<>(alias, key));
+ if (binarySearchOnKeys(alias) >= 0 || getIndexByAlias(alias) != -1) {
+ throw new RecordDuplicatedException(
+ String.format("Record [%s] has conflict alias [%s] with its
siblings.", key, alias));
+ }
+ aliasFlag = true;
}
buf.clear();
this.buffer.clear();
this.buffer.position(recordStartAddr);
+ ReadWriteIOUtils.write(key, this.buffer);
this.buffer.put(buf);
- keyAddressList.add(tarIdx, new Pair<>(key, (short) recordStartAddr));
+
+ this.buffer.clear().position(SchemaFileConfig.SEG_HEADER_SIZE + tarIdx *
2);
+ int shiftOffsets = recordNum - tarIdx;
+ if (shiftOffsets > 0) {
+ short[] shifts = new short[shiftOffsets];
+ this.buffer.asShortBuffer().get(shifts);
+ this.buffer.position(SchemaFileConfig.SEG_HEADER_SIZE + tarIdx * 2);
+ this.buffer.putShort((short) recordStartAddr);
+ this.buffer.asShortBuffer().put(shifts);
+ } else {
+ this.buffer.putShort((short) recordStartAddr);
+ }
+
this.freeAddr = (short) recordStartAddr;
this.recordNum++;
@@ -158,64 +251,307 @@ public class WrappedSegment extends
Segment<ICachedMNode> {
return recordStartAddr - pairLength - SchemaFileConfig.SEG_HEADER_SIZE;
}
+ private int getIndexByAlias(String target) {
+ ByteBuffer checkBuffer = this.buffer.asReadOnlyBuffer();
+ short[] offsets = new short[recordNum];
+
+ checkBuffer
+ .position(SchemaFileConfig.SEG_HEADER_SIZE)
+ .limit(SchemaFileConfig.SEG_HEADER_SIZE + pairLength);
+ checkBuffer.asShortBuffer().get(offsets);
+
+ checkBuffer.clear();
+ int keySize;
+ for (int i = 0; i < offsets.length; i++) {
+ checkBuffer.position(offsets[i]);
+ keySize = checkBuffer.getInt();
+ checkBuffer.position(checkBuffer.position() + keySize);
+ if (target.equals(RecordUtils.getRecordAlias(checkBuffer))) {
+ return i;
+ }
+ }
+ return -1;
+ }
+
+ @Override
+ public void extendsTo(ByteBuffer newBuffer) throws MetadataException {
+ short sizeGap = (short) (newBuffer.capacity() - length);
+
+ if (sizeGap < 0) {
+ throw new MetadataException("Leaf Segment cannot extend to a smaller
buffer.");
+ }
+
+ this.buffer.clear();
+ newBuffer.clear();
+
+ if (sizeGap == 0) {
+ this.syncBuffer();
+ this.buffer.clear();
+ newBuffer.put(this.buffer);
+ this.buffer.clear();
+ newBuffer.clear();
+ return;
+ }
+
+ ReadWriteIOUtils.write((short) newBuffer.capacity(), newBuffer);
+ ReadWriteIOUtils.write((short) (freeAddr + sizeGap), newBuffer);
+ ReadWriteIOUtils.write(recordNum, newBuffer);
+ ReadWriteIOUtils.write(pairLength, newBuffer);
+ ReadWriteIOUtils.write(prevSegAddress, newBuffer);
+ ReadWriteIOUtils.write(nextSegAddress, newBuffer);
+ ReadWriteIOUtils.write(getFlag(), newBuffer);
+
+ newBuffer.position(SchemaFileConfig.SEG_HEADER_SIZE);
+ this.buffer.position(SchemaFileConfig.SEG_HEADER_SIZE);
+ for (int i = 0; i < recordNum; i++) {
+ newBuffer.putShort((short) (this.buffer.getShort() + sizeGap));
+ }
+
+ this.buffer.clear();
+ this.buffer.position(freeAddr);
+ this.buffer.limit(length);
+ newBuffer.position(freeAddr + sizeGap);
+ newBuffer.put(this.buffer);
+ newBuffer.clear();
+ this.buffer.clear();
+ }
+
+ @Override
+ public ByteBuffer resetBuffer(int ptr) {
+ freeAddr = (short) this.buffer.capacity();
+ recordNum = 0;
+ pairLength = 0;
+ prevSegAddress = -1;
+ nextSegAddress = -1;
+ delFlag = false;
+ aliasFlag = false;
+ syncBuffer();
+ this.buffer.clear();
+ return this.buffer.slice();
+ }
+
@Override
public synchronized String splitByKey(
String key, ByteBuffer recBuf, ByteBuffer dstBuffer, boolean
inclineSplit)
throws MetadataException {
- String sk = super.splitByKey(key, recBuf, dstBuffer, inclineSplit);
- reconstructAliasAddressList();
- return sk;
+
+ if (this.buffer.capacity() != dstBuffer.capacity()) {
+ throw new MetadataException("Segments only splits with same capacity.");
+ }
+
+ if (recordNum == 0) {
+ throw new MetadataException("Segment can not be split with no records.");
+ }
+
+ if (key == null && recordNum == 1) {
+ throw new MetadataException("Segment can not be split with only one
record.");
+ }
+
+ // notice that key can be null here, and a null key means even split
+ boolean monotonic =
+ penuKey != null
+ && key != null
+ && lastKey != null
+ && inclineSplit
+ && (key.compareTo(lastKey)) * (lastKey.compareTo(penuKey)) > 0;
+
+ int n = recordNum;
+
+ // actual index of key just smaller than the insert, -2 for null key
+ int pos = key != null ? binaryInsertOnKeys(key) - 1 : -2;
+
+ int sp; // virtual index to split
+ if (monotonic) {
+ // new entry into part with more space
+ sp = key.compareTo(lastKey) > 0 ? Math.max(pos + 1, n / 2) :
Math.min(pos + 2, n / 2);
+ } else {
+ sp = n / 2;
+ }
+
+ // little different from InternalSegment, only the front edge key can not
split
+ sp = sp <= 0 ? 1 : sp;
+
+ short recordLeft = this.recordNum;
+ // prepare header for dstBuffer
+ short length = this.length,
+ freeAddr = (short) dstBuffer.capacity(),
+ recordNum = 0,
+ pairLength = 0;
+ long prevSegAddress = this.prevSegAddress, nextSegAddress =
this.nextSegAddress;
+
+ int recSize, keySize;
+ String mKey, sKey = null;
+ ByteBuffer srcBuf;
+ int aix; // aix for actual index on keyAddressList
+ n = key == null ? n - 1 : n; // null key
+ // TODO: implement bulk split further
+ for (int ix = sp; ix <= n; ix++) {
+ if (ix == pos + 1) {
+ // migrate newly insert
+ srcBuf = recBuf;
+ recSize = recBuf.capacity();
+ mKey = key;
+ keySize = 4 + mKey.getBytes().length;
+
+ recBuf.clear();
+ } else {
+ srcBuf = this.buffer;
+ // pos equals -2 if key is null
+ aix = (ix > pos) && (pos != -2) ? ix - 1 : ix;
+ mKey = getKeyByIndex(aix);
+ keySize = 4 + mKey.getBytes().length;
+
+ // prepare on this.buffer
+ this.buffer.clear();
+ this.buffer.position(getOffsetByIndex(aix) + keySize);
+ recSize = getRecordLength();
+ this.buffer.limit(this.buffer.position() + recSize);
+
+ recordLeft--;
+ }
+
+ if (ix == sp) {
+ // search key is the first key in split segment
+ sKey = mKey;
+ }
+
+ freeAddr -= recSize + keySize;
+ dstBuffer.position(freeAddr);
+ ReadWriteIOUtils.write(mKey, dstBuffer);
+ dstBuffer.put(srcBuf);
+
+ dstBuffer.position(SchemaFileConfig.SEG_HEADER_SIZE + pairLength);
+ ReadWriteIOUtils.write(freeAddr, dstBuffer);
+
+ recordNum++;
+ pairLength += 2;
+ }
+
+ // compact and update status
+ this.recordNum = recordLeft;
+ compactRecords();
+ if (sp > pos + 1 && key != null) {
+ // new insert shall be in this
+ if (insertRecord(key, recBuf) < 0) {
+ throw new ColossalRecordException(key, recBuf.capacity());
+ }
+ }
+
+ // flush dstBuffer header
+ dstBuffer.clear();
+ ReadWriteIOUtils.write(length, dstBuffer);
+ ReadWriteIOUtils.write(freeAddr, dstBuffer);
+ ReadWriteIOUtils.write(recordNum, dstBuffer);
+ ReadWriteIOUtils.write(pairLength, dstBuffer);
+ ReadWriteIOUtils.write(prevSegAddress, dstBuffer);
+ ReadWriteIOUtils.write(nextSegAddress, dstBuffer);
+ // FIXME flag of split page is not always the same
+ ReadWriteIOUtils.write(getFlag(), dstBuffer);
+
+ penuKey = null;
+ lastKey = null;
+ return sKey;
+ }
+
+ protected void compactRecords() {
+ // compact by existed item on keyAddressList
+ ByteBuffer tempBuf = ByteBuffer.allocate(this.buffer.capacity() -
this.freeAddr);
+ int accSiz = 0;
+ short[] newOffsets = new short[recordNum];
+
+ String migKey;
+ short migOffset;
+ int recLen;
+ for (int i = 0; i < recordNum; i++) {
+ migOffset = getOffsetByIndex(i);
+ migKey = getKeyByOffset(migOffset);
+ this.buffer.position(migOffset + 4 + migKey.getBytes().length);
+ recLen = getRecordLength();
+
+ this.buffer.clear();
+ this.buffer.position(migOffset);
+ this.buffer.limit(migOffset + 4 + migKey.getBytes().length + recLen);
+
+ accSiz += this.buffer.remaining();
+
+ newOffsets[i] = (short) (this.buffer.capacity() - accSiz);
+ tempBuf.position(tempBuf.capacity() - accSiz);
+ tempBuf.put(this.buffer);
+ }
+
+ tempBuf.clear();
+ tempBuf.position(tempBuf.capacity() - accSiz);
+ this.freeAddr = (short) (this.buffer.capacity() - accSiz);
+
+ this.buffer.clear();
+ this.buffer.position(SEG_HEADER_SIZE);
+ this.buffer.asShortBuffer().put(newOffsets);
+
+ this.buffer.position(this.freeAddr);
+ this.buffer.put(tempBuf);
+
+ this.syncBuffer();
+ }
+
+ private short getRecordLength() {
+ return RecordUtils.getRecordLength(this.buffer);
}
@Override
public ICachedMNode getRecordByKey(String key) throws MetadataException {
// index means order for target node in keyAddressList, NOT aliasKeyList
- int index = getRecordIndexByKey(key);
+ int index = binarySearchOnKeys(key);
if (index < 0) {
return null;
}
ByteBuffer roBuffer = this.buffer.asReadOnlyBuffer(); // for concurrent
read
- short offset = getOffsetByKeyIndex(index);
+ short offset = getOffsetByIndex(index);
roBuffer.clear();
- roBuffer.position(offset);
+ roBuffer.position(offset + key.getBytes().length + 4);
short len = RecordUtils.getRecordLength(roBuffer);
- roBuffer.limit(offset + len);
+ roBuffer.limit(offset + key.getBytes().length + 4 + len);
- return RecordUtils.buffer2Node(keyAddressList.get(index).left, roBuffer);
+ return RecordUtils.buffer2Node(key, roBuffer);
}
@Override
public ICachedMNode getRecordByAlias(String alias) throws MetadataException {
- int ix = getRecordIndexByAlias(alias);
+ int ix = getIndexByAlias(alias);
if (ix < 0) {
return null;
}
ByteBuffer rBuffer = this.buffer.asReadOnlyBuffer();
- short offset = getOffsetByKeyIndex(ix);
- rBuffer.clear().position(offset).limit(offset +
RecordUtils.getRecordLength(rBuffer));
- return RecordUtils.buffer2Node(keyAddressList.get(ix).left, rBuffer);
- }
-
- @Override
- public boolean hasRecordAlias(String alias) {
- return getRecordIndexByAlias(alias) > -1;
+ short offset = getOffsetByIndex(ix);
+ rBuffer.position(offset);
+ String key = ReadWriteIOUtils.readString(rBuffer);
+ rBuffer.limit(rBuffer.position() + RecordUtils.getRecordLength(rBuffer));
+ return RecordUtils.buffer2Node(key, rBuffer);
}
@Override
public Queue<ICachedMNode> getAllRecords() throws MetadataException {
- Queue<ICachedMNode> res = new ArrayDeque<>(keyAddressList.size());
+ Queue<ICachedMNode> res = new ArrayDeque<>(recordNum);
ByteBuffer roBuffer = this.buffer.asReadOnlyBuffer();
+
+ short[] offsets = new short[recordNum];
+ roBuffer.position(SchemaFileConfig.SEG_HEADER_SIZE);
+ roBuffer.asShortBuffer().get(offsets);
+
roBuffer.clear();
- for (Pair<String, Short> p : keyAddressList) {
- roBuffer.limit(roBuffer.capacity());
- roBuffer.position(p.right);
- short len = RecordUtils.getRecordLength(roBuffer);
- roBuffer.limit(p.right + len);
- res.add(RecordUtils.buffer2Node(p.left, roBuffer));
+
+ short len;
+ String key;
+ for (short offset : offsets) {
+ roBuffer.clear();
+ roBuffer.position(offset);
+ key = ReadWriteIOUtils.readString(roBuffer);
+ len = RecordUtils.getRecordLength(roBuffer);
+ roBuffer.limit(roBuffer.position() + len);
+ res.add(RecordUtils.buffer2Node(key, roBuffer));
}
return res;
}
@@ -224,16 +560,14 @@ public class WrappedSegment extends Segment<ICachedMNode>
{
public int updateRecord(String key, ByteBuffer uBuffer)
throws SegmentOverflowException, RecordDuplicatedException {
- int idx = getRecordIndexByKey(key);
+ int idx = binarySearchOnKeys(key);
if (idx < 0) {
return -1;
}
this.buffer.clear();
uBuffer.clear();
- this.buffer.position(keyAddressList.get(idx).right);
-
- String oriAlias = RecordUtils.getRecordAlias(this.buffer);
+ this.buffer.position(getOffsetByIndex(idx) + 4 + key.getBytes().length);
short oriLen = RecordUtils.getRecordLength(this.buffer);
short newLen = (short) uBuffer.capacity();
@@ -242,28 +576,21 @@ public class WrappedSegment extends Segment<ICachedMNode>
{
this.buffer.limit(this.buffer.position() + oriLen);
this.buffer.put(uBuffer);
} else {
- // allocate new space for record, modify key-address list, freeAddr
- if (SchemaFileConfig.SEG_HEADER_SIZE + pairLength + newLen > freeAddr) {
+ // allocate new space for record, update offset array, freeAddr
+ if (SchemaFileConfig.SEG_HEADER_SIZE + pairLength + newLen + 4 +
key.getBytes().length
+ > freeAddr) {
// not enough space
throw new SegmentOverflowException(idx);
}
- freeAddr = (short) (freeAddr - newLen);
+ freeAddr = (short) (freeAddr - newLen - 4 - key.getBytes().length);
// it will not mark old record as expired
this.buffer.position(freeAddr);
- this.buffer.limit(freeAddr + newLen);
- keyAddressList.get(idx).right = freeAddr;
+ ReadWriteIOUtils.write(key, this.buffer);
this.buffer.put(uBuffer);
- }
- // update alias-key list accordingly
- if (oriAlias != null) {
- aliasKeyList.remove(binarySearchPairList(aliasKeyList, oriAlias));
- }
- uBuffer.clear();
- String alias = RecordUtils.getRecordAlias(uBuffer);
- if (alias != null) {
- aliasKeyList.add(binaryInsertPairList(aliasKeyList, alias), new
Pair<>(alias, key));
+ this.buffer.position(SchemaFileConfig.SEG_HEADER_SIZE + 2 * idx);
+ this.buffer.putShort(freeAddr);
}
return idx;
@@ -271,85 +598,177 @@ public class WrappedSegment extends
Segment<ICachedMNode> {
@Override
public int removeRecord(String key) {
- int idx = getRecordIndexByKey(key);
+ int idx = binarySearchOnKeys(key);
// deletion only seeks for name of ICacheMNode
if (idx < 0) {
return -1;
}
- this.buffer.clear();
- this.buffer.position(keyAddressList.get(idx).right);
-
- // free address pointer forwards if last record removed
- if (keyAddressList.get(idx).right == freeAddr) {
+ // restore space immediately if the last record removed
+ short offset = getOffsetByIndex(idx);
+ if (offset == freeAddr) {
+ this.buffer.position(offset + 4 + key.getBytes().length);
short len = RecordUtils.getRecordLength(this.buffer);
- freeAddr += len;
+ freeAddr += len + 4 + key.getBytes().length;
}
- // update alias-key list accordingly
- String alias = RecordUtils.getRecordAlias(this.buffer);
- if (alias != null && !alias.equals("")) {
- aliasKeyList.remove(binarySearchPairList(aliasKeyList, alias));
+ // shift offsets forward
+ if (idx != recordNum) {
+ int shift = recordNum - idx;
+ this.buffer.position(SchemaFileConfig.SEG_HEADER_SIZE + idx * 2);
+ ShortBuffer lb = this.buffer.asReadOnlyBuffer().asShortBuffer();
+ lb.get();
+ while (shift != 0) {
+ this.buffer.putShort(lb.get());
+ shift--;
+ }
}
- // TODO: compact segment further as well
recordNum--;
pairLength -= 2;
- pairLength -= (key.getBytes().length + 4);
- keyAddressList.remove(idx);
return idx;
}
- @Override
- protected short getRecordLength() {
- return RecordUtils.getRecordLength(this.buffer);
- }
// endregion
// region Segment & Record Buffer Operation
protected void updateRecordSegAddr(String key, long newSegAddr) {
- int index = getRecordIndexByKey(key);
- short offset = getOffsetByKeyIndex(index);
+ int index = binarySearchOnKeys(key);
+ short offset = getOffsetByIndex(index);
this.buffer.clear();
- this.buffer.position(offset);
+ this.buffer.position(offset + 4 + key.getBytes().length);
RecordUtils.updateSegAddr(this.buffer, newSegAddr);
}
// endregion
- // region Getters of Record Index or Offset
- /**
- * To decouple search implementation from other methods Rather than offset
of the target key,
- * index could be used to update or remove on keyAddressList
- *
- * @param key Record Key
- * @return index of record, -1 for not found
- */
- private int getRecordIndexByKey(String key) {
- return binarySearchPairList(keyAddressList, key);
+ // region Record Index Access
+ // todo abstract with same name method within Internal
+
+ protected short getOffsetByIndex(int index) {
+ if (index < 0 || index >= recordNum) {
+ throw new IndexOutOfBoundsException();
+ }
+ synchronized (this.buffer) {
+ this.buffer.limit(this.buffer.capacity());
+ this.buffer.position(SEG_HEADER_SIZE + index *
SchemaFileConfig.SEG_OFF_DIG);
+ return ReadWriteIOUtils.readShort(this.buffer);
+ }
+ }
+
+ protected String getKeyByOffset(short offset) {
+ synchronized (this.buffer) {
+ this.buffer.limit(this.buffer.capacity());
+ this.buffer.position(offset);
+ return ReadWriteIOUtils.readString(this.buffer);
+ }
+ }
+
+ private String getKeyByIndex(int index) {
+ if (index < 0 || index >= recordNum) {
+ throw new IndexOutOfBoundsException();
+ }
+ synchronized (this.buffer) {
+ this.buffer
+ .limit(this.buffer.capacity())
+ .position(SEG_HEADER_SIZE + index * SchemaFileConfig.SEG_OFF_DIG);
+ this.buffer.position(ReadWriteIOUtils.readShort(this.buffer));
+ return ReadWriteIOUtils.readString(this.buffer);
+ }
}
/**
- * Notice it figures index of the record within {@link #keyAddressList}
whose alias is target
- * parameter.
- *
- * @param alias
- * @return index of the record within {@link #keyAddressList}
+ * @param key
+ * @return -1 if not existed, otherwise correspondent position of target
offset
*/
- private int getRecordIndexByAlias(String alias) {
- int aliasIndex = binarySearchPairList(aliasKeyList, alias);
- if (aliasIndex < 0) {
+ protected int binarySearchOnKeys(String key) {
+ int head = 0;
+ int tail = recordNum - 1;
+ if (tail < 0
+ || key.compareTo(getKeyByIndex(head)) < 0
+ || key.compareTo(getKeyByIndex(tail)) > 0) {
return -1;
}
- return binarySearchPairList(keyAddressList,
aliasKeyList.get(aliasIndex).right);
+
+ if (key.compareTo(getKeyByIndex(head)) == 0) {
+ return head;
+ }
+ if (key.compareTo(getKeyByIndex(tail)) == 0) {
+ return tail;
+ }
+
+ int pivot = (head + tail) / 2;
+ while (key.compareTo(getKeyByIndex(pivot)) != 0) {
+ if (head == tail || pivot == head || pivot == tail) {
+ return -1;
+ }
+ if (key.compareTo(getKeyByIndex(pivot)) < 0) {
+ tail = pivot;
+ } else if (key.compareTo(getKeyByIndex(pivot)) > 0) {
+ head = pivot;
+ }
+ pivot = (head + tail) / 2;
+ }
+ return pivot;
}
- private short getOffsetByKeyIndex(int index) {
- return keyAddressList.get(index).right;
+ protected int binaryInsertOnKeys(String key) throws
RecordDuplicatedException {
+ if (recordNum == 0) {
+ return 0;
+ }
+
+ int tarIdx = 0;
+ int head = 0;
+ int tail = recordNum - 1;
+
+ if (getKeyByIndex(head).compareTo(key) == 0 ||
getKeyByIndex(tail).compareTo(key) == 0) {
+ throw new RecordDuplicatedException(key);
+ }
+
+ if (key.compareTo(getKeyByIndex(head)) < 0) {
+ return 0;
+ }
+
+ if (key.compareTo(getKeyByIndex(tail)) > 0) {
+ return recordNum;
+ }
+
+ int pivot;
+ while (head != tail) {
+ pivot = (head + tail) / 2;
+ // notice pivot always smaller than list.size()-1
+ if (getKeyByIndex(pivot).compareTo(key) == 0
+ || getKeyByIndex(pivot + 1).compareTo(key) == 0) {
+ throw new RecordDuplicatedException(key);
+ }
+
+ if (getKeyByIndex(pivot).compareTo(key) < 0 && getKeyByIndex(pivot +
1).compareTo(key) > 0) {
+ return pivot + 1;
+ }
+
+ if (pivot == head || pivot == tail) {
+ if (getKeyByIndex(head).compareTo(key) > 0) {
+ return head;
+ }
+ if (getKeyByIndex(tail).compareTo(key) < 0) {
+ return tail + 1;
+ }
+ }
+
+ // impossible for pivot.cmp > 0 and (pivot+1).cmp < 0
+ if (getKeyByIndex(pivot).compareTo(key) > 0) {
+ tail = pivot;
+ }
+
+ if (getKeyByIndex(pivot + 1).compareTo(key) < 0) {
+ head = pivot;
+ }
+ }
+ return tarIdx;
}
// endregion
@@ -358,6 +777,7 @@ public class WrappedSegment extends Segment<ICachedMNode> {
public String toString() {
ByteBuffer bufferR = this.buffer.asReadOnlyBuffer();
StringBuilder builder = new StringBuilder("");
+ List<Pair<String, Short>> keyAddressList = getKeyOffsetList();
builder.append(
String.format(
"[size: %d, K-AL size: %d, spare:%d,",
@@ -366,7 +786,7 @@ public class WrappedSegment extends Segment<ICachedMNode> {
freeAddr - pairLength - SchemaFileConfig.SEG_HEADER_SIZE));
bufferR.clear();
for (Pair<String, Short> pair : keyAddressList) {
- bufferR.position(pair.right);
+ bufferR.position(pair.right + 4 + pair.left.getBytes().length);
if (RecordUtils.getRecordType(bufferR) == 0 ||
RecordUtils.getRecordType(bufferR) == 1) {
builder.append(
String.format(
@@ -391,6 +811,8 @@ public class WrappedSegment extends Segment<ICachedMNode> {
return "";
}
+ List<Pair<String, Short>> keyAddressList = getKeyOffsetList();
+
// Internal/Entity presents as (name, is_aligned, child_segment_address)
// Measurement presents as (name, data_type, encoding, compressor,
alias_if_exist)
ByteBuffer bufferR = this.buffer.asReadOnlyBuffer();
@@ -403,7 +825,7 @@ public class WrappedSegment extends Segment<ICachedMNode> {
freeAddr - pairLength - SchemaFileConfig.SEG_HEADER_SIZE));
bufferR.clear();
for (Pair<String, Short> pair : keyAddressList) {
- bufferR.position(pair.right);
+ bufferR.position(pair.right + pair.left.getBytes().length + 4);
if (RecordUtils.getRecordType(bufferR) == 0 ||
RecordUtils.getRecordType(bufferR) == 1) {
builder.append(
String.format(
@@ -457,13 +879,14 @@ public class WrappedSegment extends Segment<ICachedMNode>
{
@TestOnly
public ByteBuffer getRecord(String key) {
short targetAddr;
- int idx = getRecordIndexByKey(key);
+ int idx = binarySearchOnKeys(key);
if (idx >= 0) {
- targetAddr = keyAddressList.get(idx).right;
+ targetAddr = getOffsetByIndex(idx);
this.buffer.clear();
this.buffer.position(targetAddr);
+ this.buffer.position(targetAddr + 4 + this.buffer.getInt());
short len = RecordUtils.getRecordLength(this.buffer);
- this.buffer.limit(targetAddr + len);
+ this.buffer.limit(this.buffer.position() + len);
return this.buffer.slice();
}
return null;
@@ -471,7 +894,26 @@ public class WrappedSegment extends Segment<ICachedMNode> {
@TestOnly
public List<Pair<String, Short>> getKeyOffsetList() {
- return keyAddressList;
+ short[] offsets = getOffsets();
+ List<Pair<String, Short>> res = new ArrayList<>();
+
+ ByteBuffer buffer = this.buffer.asReadOnlyBuffer();
+ buffer.clear();
+ for (short offset : offsets) {
+ buffer.position(offset);
+ res.add(new Pair<>(ReadWriteIOUtils.readString(buffer), offset));
+ }
+
+ return res;
+ }
+
+ @TestOnly
+ public short[] getOffsets() {
+ ByteBuffer buf = this.buffer.asReadOnlyBuffer();
+ short[] ofs = new short[recordNum];
+ buf.clear().position(SEG_HEADER_SIZE);
+ buf.asShortBuffer().get(ofs);
+ return ofs;
}
// endregion