This is an automated email from the ASF dual-hosted git repository.

zyk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 0144133481c PBTree Improvement: WrappedSegment Layout Revision for 
Reduced GC Overhead (#11652)
0144133481c is described below

commit 0144133481ccd95b29673f8dbde2ac65fa7f79a1
Author: ZhaoXin <[email protected]>
AuthorDate: Tue Dec 5 10:21:54 2023 +0800

    PBTree Improvement: WrappedSegment Layout Revision for Reduced GC Overhead 
(#11652)
---
 .../mtree/impl/pbtree/schemafile/InternalPage.java |  18 +-
 .../mtree/impl/pbtree/schemafile/SchemaPage.java   |   2 +-
 .../mtree/impl/pbtree/schemafile/Segment.java      | 478 --------------
 .../impl/pbtree/schemafile/WrappedSegment.java     | 708 +++++++++++++++++----
 4 files changed, 584 insertions(+), 622 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/InternalPage.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/InternalPage.java
index 6e82448b58c..724e97f67bf 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/InternalPage.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/InternalPage.java
@@ -39,7 +39,7 @@ public class InternalPage extends SchemaPage implements 
ISegment<Integer, Intege
   private long firstLeaf;
   private int subIndexPage;
 
-  private transient String penultKey, lastKey;
+  private String penultKey, lastKey;
 
   /**
    * <b>Compound pointers will not be deserialized as any Java Objects since 
it may contains massive
@@ -584,19 +584,17 @@ public class InternalPage extends SchemaPage implements 
ISegment<Integer, Intege
     }
   }
 
-  private String getKeyByOffset(short offset) {
-    synchronized (this.pageBuffer) {
-      this.pageBuffer.limit(this.pageBuffer.capacity());
-      this.pageBuffer.position(offset);
-      return ReadWriteIOUtils.readString(this.pageBuffer);
-    }
-  }
-
   private String getKeyByIndex(int index) {
     if (index <= 0 || index >= memberNum) {
       throw new IndexOutOfBoundsException();
     }
-    return getKeyByOffset(keyOffset(getPointerByIndex(index)));
+    synchronized (pageBuffer) {
+      this.pageBuffer.limit(this.pageBuffer.capacity());
+      this.pageBuffer.position(SchemaFileConfig.PAGE_HEADER_SIZE + index * 
COMPOUND_POINT_LENGTH);
+      short ofs = (short) (this.pageBuffer.getLong() & 
SchemaFileConfig.COMP_PTR_OFFSET_MASK);
+      this.pageBuffer.position(ofs);
+      return ReadWriteIOUtils.readString(this.pageBuffer);
+    }
   }
   // endregion
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/SchemaPage.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/SchemaPage.java
index b9d325df8ce..d72c80b6da3 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/SchemaPage.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/SchemaPage.java
@@ -42,7 +42,7 @@ public abstract class SchemaPage implements ISchemaPage {
   protected short spareSize; // traces spare space size simultaneously
   protected short memberNum; // amount of the member, definition depends on 
implementation
 
-  protected volatile boolean dirtyFlag = false; // any modification turns it 
true
+  protected boolean dirtyFlag = false; // any modification turns it true
 
   protected SchemaPage(ByteBuffer pageBuffer) {
     this.pageBuffer = pageBuffer;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/Segment.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/Segment.java
deleted file mode 100644
index 09c3f09c8ac..00000000000
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/Segment.java
+++ /dev/null
@@ -1,478 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package 
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.schemafile;
-
-import org.apache.iotdb.commons.exception.MetadataException;
-import 
org.apache.iotdb.db.exception.metadata.schemafile.ColossalRecordException;
-import 
org.apache.iotdb.db.exception.metadata.schemafile.RecordDuplicatedException;
-import org.apache.iotdb.tsfile.utils.Pair;
-import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
-
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.List;
-
-import static 
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.schemafile.SchemaFileConfig.SEG_HEADER_SIZE;
-
-/**
- * Segments store [String]keys with bytebuffer records, which can be 
serialized IMNode, bytes of
- * alias string or any other attributes need to store with hierarchy structure.
- *
- * @param <R> type to construct from bytebuffer
- */
-public abstract class Segment<R> implements ISegment<ByteBuffer, R> {
-
-  // members load from buffer
-  protected final ByteBuffer buffer;
-  protected short length, freeAddr, recordNum, pairLength;
-  protected boolean delFlag;
-  protected long prevSegAddress, nextSegAddress;
-
-  // reconstruct from key-address pair buffer
-  protected List<Pair<String, Short>> keyAddressList;
-
-  // assess monotonic
-  protected String penuKey = null, lastKey = null;
-
-  /**
-   * Init Segment with a buffer, which contains all information about this 
segment
-   *
-   * <p>For a page no more than 16 kib, a signed short is enough to index all 
bytes inside a
-   * segment.
-   *
-   * <p><b>Segment Structure:</b>
-   * <li>25 byte: header
-   * <li>1 short: length, segment length
-   * <li>1 short: freeAddr, start offset of records
-   * <li>1 short: recordNum, amount of records in this segment
-   * <li>1 short: pairLength, length of key-address in bytes
-   * <li>1 long (8 bytes): prevSegIndex, previous segment index
-   * <li>1 long (8 bytes): nextSegIndex, next segment index
-   * <li>1 bit: delFlag, delete flag <br>
-   *     (--- checksum, parent record address, max/min record key may be 
contained further ---)
-   * <li>var length: key-address pairs, begin at 25 bytes offset, length of 
pairLength <br>
-   *     ... empty space ...
-   * <li>var length: records
-   */
-  protected Segment(ByteBuffer buffer, boolean override) {
-    this.buffer = buffer;
-
-    this.buffer.clear();
-    if (override) {
-      // blank segment
-      length = (short) buffer.capacity();
-      freeAddr = (short) buffer.capacity();
-      recordNum = 0;
-      pairLength = 0;
-
-      // these two address need to be initiated as same as in childrenContainer
-      prevSegAddress = -1;
-      nextSegAddress = -1;
-      delFlag = false;
-      // parRecord = lastSegAddr = nextSegAddr = 0L;
-
-      keyAddressList = new ArrayList<>();
-    } else {
-      length = ReadWriteIOUtils.readShort(buffer);
-      freeAddr = ReadWriteIOUtils.readShort(buffer);
-      recordNum = ReadWriteIOUtils.readShort(buffer);
-      pairLength = ReadWriteIOUtils.readShort(buffer);
-
-      // parRecord = ReadWriteIOUtils.readLong(buffer);
-      prevSegAddress = ReadWriteIOUtils.readLong(buffer);
-      nextSegAddress = ReadWriteIOUtils.readLong(buffer);
-      delFlag = ReadWriteIOUtils.readBool(buffer);
-
-      buffer.position(SEG_HEADER_SIZE);
-      buffer.limit(SEG_HEADER_SIZE + pairLength);
-      ByteBuffer pairBuffer = buffer.slice();
-      buffer.clear(); // reconstruction finished, reset buffer position and 
limit
-      reconstructKeyAddress(pairBuffer);
-    }
-  }
-
-  private void reconstructKeyAddress(ByteBuffer pairBuffer) {
-    keyAddressList = new ArrayList<>();
-    for (int idx = 0; idx < recordNum; idx++) {
-      String key = ReadWriteIOUtils.readString(pairBuffer);
-      Short address = ReadWriteIOUtils.readShort(pairBuffer);
-      keyAddressList.add(new Pair<>(key, address));
-    }
-  }
-
-  @Override
-  public boolean hasRecordKey(String key) {
-    return binarySearchPairList(keyAddressList, key) > -1;
-  }
-
-  @Override
-  public boolean hasRecordAlias(String alias) {
-    return false;
-  }
-
-  @Override
-  public void syncBuffer() {
-    ByteBuffer prefBuffer = ByteBuffer.allocate(SEG_HEADER_SIZE + pairLength);
-
-    ReadWriteIOUtils.write(length, prefBuffer);
-    ReadWriteIOUtils.write(freeAddr, prefBuffer);
-    ReadWriteIOUtils.write(recordNum, prefBuffer);
-    ReadWriteIOUtils.write(pairLength, prefBuffer);
-    ReadWriteIOUtils.write(prevSegAddress, prefBuffer);
-    ReadWriteIOUtils.write(nextSegAddress, prefBuffer);
-    ReadWriteIOUtils.write(delFlag, prefBuffer);
-
-    prefBuffer.position(SEG_HEADER_SIZE);
-
-    for (Pair<String, Short> pair : keyAddressList) {
-      ReadWriteIOUtils.write(pair.left, prefBuffer);
-      ReadWriteIOUtils.write(pair.right, prefBuffer);
-    }
-
-    prefBuffer.clear();
-    this.buffer.clear();
-    this.buffer.put(prefBuffer);
-  }
-
-  @Override
-  public short size() {
-    return length;
-  }
-
-  @Override
-  public short getSpareSize() {
-    return (short) (freeAddr - pairLength - SEG_HEADER_SIZE);
-  }
-
-  @Override
-  public void delete() {
-    this.delFlag = true;
-    this.buffer.clear();
-    this.buffer.position(SchemaFileConfig.SEG_HEADER_SIZE - 1);
-    ReadWriteIOUtils.write(true, this.buffer);
-  }
-
-  @Override
-  public long getNextSegAddress() {
-    return nextSegAddress;
-  }
-
-  @Override
-  public void setNextSegAddress(long nextSegAddress) {
-    this.nextSegAddress = nextSegAddress;
-  }
-
-  @Override
-  public void extendsTo(ByteBuffer newBuffer) throws MetadataException {
-    short sizeGap = (short) (newBuffer.capacity() - length);
-
-    if (sizeGap < 0) {
-      throw new MetadataException("Leaf Segment cannot extend to a smaller 
buffer.");
-    }
-
-    this.buffer.clear();
-    newBuffer.clear();
-
-    if (sizeGap == 0) {
-      this.syncBuffer();
-      this.buffer.clear();
-      newBuffer.put(this.buffer);
-      this.buffer.clear();
-      newBuffer.clear();
-      return;
-    }
-
-    ReadWriteIOUtils.write((short) newBuffer.capacity(), newBuffer);
-    ReadWriteIOUtils.write((short) (freeAddr + sizeGap), newBuffer);
-    ReadWriteIOUtils.write(recordNum, newBuffer);
-    ReadWriteIOUtils.write(pairLength, newBuffer);
-    ReadWriteIOUtils.write(prevSegAddress, newBuffer);
-    ReadWriteIOUtils.write(nextSegAddress, newBuffer);
-    ReadWriteIOUtils.write(delFlag, newBuffer);
-
-    newBuffer.position(SchemaFileConfig.SEG_HEADER_SIZE);
-    for (Pair<String, Short> pair : keyAddressList) {
-      ReadWriteIOUtils.write(pair.left, newBuffer);
-      ReadWriteIOUtils.write((short) (pair.right + sizeGap), newBuffer);
-    }
-
-    this.buffer.clear();
-    this.buffer.position(freeAddr);
-    this.buffer.limit(length);
-    newBuffer.position(freeAddr + sizeGap);
-    newBuffer.put(this.buffer);
-    newBuffer.clear();
-    this.buffer.clear();
-  }
-
-  @Override
-  public ByteBuffer resetBuffer(int ptr) {
-    freeAddr = (short) this.buffer.capacity();
-    recordNum = 0;
-    pairLength = 0;
-    prevSegAddress = -1;
-    nextSegAddress = -1;
-    keyAddressList.clear();
-    syncBuffer();
-    this.buffer.clear();
-    return this.buffer.slice();
-  }
-
-  @Override
-  public String splitByKey(
-      String key, ByteBuffer recBuf, ByteBuffer dstBuffer, boolean 
inclineSplit)
-      throws MetadataException {
-
-    if (this.buffer.capacity() != dstBuffer.capacity()) {
-      throw new MetadataException("Segments only splits with same capacity.");
-    }
-
-    if (keyAddressList.size() == 0) {
-      throw new MetadataException("Segment can not be split with no records.");
-    }
-
-    if (key == null && keyAddressList.size() == 1) {
-      throw new MetadataException("Segment can not be split with only one 
record.");
-    }
-
-    // notice that key can be null here
-    boolean monotonic =
-        penuKey != null
-            && key != null
-            && lastKey != null
-            && inclineSplit
-            && (key.compareTo(lastKey)) * (lastKey.compareTo(penuKey)) > 0;
-
-    int n = keyAddressList.size();
-
-    // actual index of key just smaller than the insert, -2 for null key
-    int pos = key != null ? binaryInsertPairList(keyAddressList, key) - 1 : -2;
-
-    int sp; // virtual index to split
-    if (monotonic) {
-      // new entry into part with more space
-      sp = key.compareTo(lastKey) > 0 ? Math.max(pos + 1, n / 2) : 
Math.min(pos + 2, n / 2);
-    } else {
-      sp = n / 2;
-    }
-
-    // little different from InternalSegment, only the front edge key can not 
split
-    sp = sp <= 0 ? 1 : sp;
-
-    // prepare header for dstBuffer
-    short length = this.length,
-        freeAddr = (short) dstBuffer.capacity(),
-        recordNum = 0,
-        pairLength = 0;
-    boolean delFlag = false;
-    long prevSegAddress = this.prevSegAddress, nextSegAddress = 
this.nextSegAddress;
-
-    int recSize, keySize;
-    String mKey, sKey = null;
-    ByteBuffer srcBuf;
-    int aix; // aix for actual index on keyAddressList
-    n = key == null ? n - 1 : n; // null key
-    // TODO: implement bulk split further
-    for (int ix = sp; ix <= n; ix++) {
-      if (ix == pos + 1) {
-        // migrate newly insert
-        srcBuf = recBuf;
-        recSize = recBuf.capacity();
-        mKey = key;
-        keySize = 4 + mKey.getBytes().length;
-
-        recBuf.clear();
-      } else {
-        srcBuf = this.buffer;
-        // pos equals -2 if key is null
-        aix = (ix > pos) && (pos != -2) ? ix - 1 : ix;
-        mKey = keyAddressList.get(aix).left;
-        keySize = 4 + mKey.getBytes().length;
-
-        // prepare on this.buffer
-        this.buffer.clear();
-        this.buffer.position(keyAddressList.get(aix).right);
-        recSize = getRecordLength();
-        this.buffer.limit(this.buffer.position() + recSize);
-
-        this.recordNum--;
-      }
-
-      if (ix == sp) {
-        // search key is the first key in split segment
-        sKey = mKey;
-      }
-
-      freeAddr -= recSize;
-      dstBuffer.position(freeAddr);
-      dstBuffer.put(srcBuf);
-
-      dstBuffer.position(SchemaFileConfig.SEG_HEADER_SIZE + pairLength);
-      ReadWriteIOUtils.write(mKey, dstBuffer);
-      ReadWriteIOUtils.write(freeAddr, dstBuffer);
-
-      recordNum++;
-      pairLength += keySize + 2;
-    }
-
-    // compact and update status
-    this.keyAddressList = this.keyAddressList.subList(0, this.recordNum);
-    compactRecords();
-    if (sp > pos + 1 && key != null) {
-      // new insert shall be in this
-      if (insertRecord(key, recBuf) < 0) {
-        throw new ColossalRecordException(key, recBuf.capacity());
-      }
-    }
-
-    // flush dstBuffer header
-    dstBuffer.clear();
-    ReadWriteIOUtils.write(length, dstBuffer);
-    ReadWriteIOUtils.write(freeAddr, dstBuffer);
-    ReadWriteIOUtils.write(recordNum, dstBuffer);
-    ReadWriteIOUtils.write(pairLength, dstBuffer);
-    ReadWriteIOUtils.write(prevSegAddress, dstBuffer);
-    ReadWriteIOUtils.write(nextSegAddress, dstBuffer);
-    ReadWriteIOUtils.write(delFlag, dstBuffer);
-
-    penuKey = null;
-    lastKey = null;
-    return sKey;
-  }
-
-  protected void compactRecords() {
-    // compact by existed item on keyAddressList
-    ByteBuffer tempBuf = ByteBuffer.allocate(this.buffer.capacity() - 
this.freeAddr);
-    int accSiz = 0;
-    this.pairLength = 0;
-    for (Pair<String, Short> pair : keyAddressList) {
-      this.pairLength += pair.left.getBytes().length + 4 + 2;
-      this.buffer.clear();
-      this.buffer.position(pair.right);
-      this.buffer.limit(pair.right + getRecordLength());
-
-      accSiz += this.buffer.remaining();
-      pair.right = (short) (this.buffer.capacity() - accSiz);
-
-      tempBuf.position(tempBuf.capacity() - accSiz);
-      tempBuf.put(this.buffer);
-    }
-    tempBuf.clear();
-    tempBuf.position(tempBuf.capacity() - accSiz);
-    this.freeAddr = (short) (this.buffer.capacity() - accSiz);
-
-    this.buffer.clear();
-    this.buffer.position(this.freeAddr);
-    this.buffer.put(tempBuf);
-
-    this.syncBuffer();
-  }
-
-  /** Assuming that buffer has been set position well, record length depends 
on implementation. */
-  protected abstract short getRecordLength();
-
-  protected static <T> int binarySearchPairList(List<Pair<String, T>> list, 
String key) {
-    int head = 0;
-    int tail = list.size() - 1;
-    if (tail < 0
-        || key.compareTo(list.get(head).left) < 0
-        || key.compareTo(list.get(tail).left) > 0) {
-      return -1;
-    }
-    if (key.compareTo(list.get(head).left) == 0) {
-      return head;
-    }
-    if (key.compareTo(list.get(tail).left) == 0) {
-      return tail;
-    }
-    int pivot = (head + tail) / 2;
-    while (key.compareTo(list.get(pivot).left) != 0) {
-      if (head == tail || pivot == head || pivot == tail) {
-        return -1;
-      }
-      if (key.compareTo(list.get(pivot).left) < 0) {
-        tail = pivot;
-      } else if (key.compareTo(list.get(pivot).left) > 0) {
-        head = pivot;
-      }
-      pivot = (head + tail) / 2;
-    }
-    return pivot;
-  }
-
-  /**
-   * @return target index the record with passing in key should be inserted
-   * @throws RecordDuplicatedException
-   */
-  protected static <T> int binaryInsertPairList(List<Pair<String, T>> list, 
String key)
-      throws RecordDuplicatedException {
-    if (list.isEmpty()) {
-      return 0;
-    }
-
-    int tarIdx = 0;
-    int head = 0;
-    int tail = list.size() - 1;
-
-    if (list.get(head).left.compareTo(key) == 0 || 
list.get(tail).left.compareTo(key) == 0) {
-      throw new RecordDuplicatedException(key);
-    }
-
-    if (key.compareTo(list.get(head).left) < 0) {
-      return 0;
-    }
-
-    if (key.compareTo(list.get(tail).left) > 0) {
-      return list.size();
-    }
-
-    int pivot;
-    while (head != tail) {
-      pivot = (head + tail) / 2;
-      // notice pivot always smaller than list.size()-1
-      if (list.get(pivot).left.compareTo(key) == 0
-          || list.get(pivot + 1).left.compareTo(key) == 0) {
-        throw new RecordDuplicatedException(key);
-      }
-
-      if (list.get(pivot).left.compareTo(key) < 0 && list.get(pivot + 
1).left.compareTo(key) > 0) {
-        return pivot + 1;
-      }
-
-      if (pivot == head || pivot == tail) {
-        if (list.get(head).left.compareTo(key) > 0) {
-          return head;
-        }
-        if (list.get(tail).left.compareTo(key) < 0) {
-          return tail + 1;
-        }
-      }
-
-      // impossible for pivot.cmp > 0 and (pivot+1).cmp < 0
-      if (list.get(pivot).left.compareTo(key) > 0) {
-        tail = pivot;
-      }
-
-      if (list.get(pivot + 1).left.compareTo(key) < 0) {
-        head = pivot;
-      }
-    }
-    return tarIdx;
-  }
-}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/WrappedSegment.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/WrappedSegment.java
index a5042fa67f6..c535a811a05 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/WrappedSegment.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/WrappedSegment.java
@@ -20,6 +20,7 @@ package 
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.schemafi
 
 import org.apache.iotdb.commons.exception.MetadataException;
 import org.apache.iotdb.commons.utils.TestOnly;
+import 
org.apache.iotdb.db.exception.metadata.schemafile.ColossalRecordException;
 import 
org.apache.iotdb.db.exception.metadata.schemafile.RecordDuplicatedException;
 import 
org.apache.iotdb.db.exception.metadata.schemafile.SegmentOverflowException;
 import 
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.mnode.ICachedMNode;
@@ -31,49 +32,75 @@ import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
 
 import java.nio.BufferUnderflowException;
 import java.nio.ByteBuffer;
+import java.nio.ShortBuffer;
 import java.util.ArrayDeque;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Queue;
 
+import static 
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.schemafile.SchemaFileConfig.SEG_HEADER_SIZE;
+
 /**
- * This class initiate a segment object with corresponding bytes. Implements 
add, get, remove
- * records which is serialized ICacheMNode. <br>
+ * The physical instance that stores buffer of IMNodes. <br>
  * Act like a wrapper of a bytebuffer which reflects a segment. <br>
  * And itself is wrapped inside a SchemaPage.
  */
-public class WrappedSegment extends Segment<ICachedMNode> {
+public class WrappedSegment implements ISegment<ByteBuffer, ICachedMNode> {
+
+  // members load from buffer
+  protected final ByteBuffer buffer;
+  protected short length, freeAddr, recordNum, pairLength;
+  protected boolean delFlag, aliasFlag;
+  protected long prevSegAddress, nextSegAddress;
 
-  // reconstruct every initiation after keyAddressList but not write into 
buffer
-  private List<Pair<String, String>> aliasKeyList;
+  // assess monotonic
+  protected String penuKey = null, lastKey = null;
 
   /**
-   * Init Segment with a buffer, which contains all information about this 
segment
-   *
-   * <p>For a page no more than 16 kib, a signed short is enough to index all 
bytes inside a
-   * segment.
-   *
-   * <p><b>Segment Structure:</b>
-   * <li>25 byte: header
+   * <b>Segment Structure:</b>
+   * <li>(25 byte: header)
    * <li>1 short: length, segment length
    * <li>1 short: freeAddr, start offset of records
    * <li>1 short: recordNum, amount of records in this segment
    * <li>1 short: pairLength, length of key-address in bytes
    * <li>1 long (8 bytes): prevSegIndex, previous segment index
    * <li>1 long (8 bytes): nextSegIndex, next segment index
-   * <li>1 bit: delFlag, delete flag <br>
+   * <li>1 bit: delFlag, delete flag
+   * <li>1 bit: aliasFlag, whether alias existed<br>
    *     (--- checksum, parent record address, max/min record key may be 
contained further ---)
-   * <li>var length: key-address pairs, begin at 25 bytes offset, length of 
pairLength <br>
+   * <li><s>var length: key-address pairs, begin at 25 bytes offset, length of 
pairLength </s><br>
+   * <li>var length: ORDERED record offset<br>
    *     ... empty space ...
-   * <li>var length: records
+   * <li>var length: (record key, record body) * recordNum
    */
-  public WrappedSegment(ByteBuffer buffer, boolean override) throws 
RecordDuplicatedException {
-    super(buffer, override);
+  public WrappedSegment(ByteBuffer buffer, boolean override) {
+    this.buffer = buffer;
+    this.buffer.clear();
 
     if (override) {
-      aliasKeyList = new ArrayList<>();
+      // blank segment
+      length = (short) buffer.capacity();
+      freeAddr = (short) buffer.capacity();
+      recordNum = 0;
+      pairLength = 0;
+
+      // these two address need to be initiated as same as in childrenContainer
+      prevSegAddress = -1;
+      nextSegAddress = -1;
+      delFlag = false;
+      aliasFlag = false;
+
     } else {
-      reconstructAliasAddressList();
+      length = ReadWriteIOUtils.readShort(buffer);
+      freeAddr = ReadWriteIOUtils.readShort(buffer);
+      recordNum = ReadWriteIOUtils.readShort(buffer);
+      pairLength = ReadWriteIOUtils.readShort(buffer);
+
+      prevSegAddress = ReadWriteIOUtils.readLong(buffer);
+      nextSegAddress = ReadWriteIOUtils.readLong(buffer);
+      byte flags = ReadWriteIOUtils.readByte(buffer);
+      delFlag = (0x80 & flags) != 0;
+      aliasFlag = (0x40 & flags) != 0;
     }
   }
 
@@ -102,54 +129,120 @@ public class WrappedSegment extends 
Segment<ICachedMNode> {
     return new WrappedSegment(buffer, false);
   }
 
-  private void reconstructAliasAddressList() throws RecordDuplicatedException {
-    if (aliasKeyList != null) {
-      aliasKeyList.clear();
-    } else {
-      aliasKeyList = new ArrayList<>();
-    }
+  // region Interface Implementation
 
-    ByteBuffer bufferR = this.buffer.asReadOnlyBuffer();
-    bufferR.clear();
-    for (Pair<String, Short> p : keyAddressList) {
-      if (p.right >= 0) {
-        bufferR.position(p.right);
-        String alias = RecordUtils.getRecordAlias(bufferR);
-        if (alias != null) {
-          aliasKeyList.add(binaryInsertPairList(aliasKeyList, alias), new 
Pair<>(alias, p.left));
-        }
-      }
-    }
+  @Override
+  public boolean hasRecordKey(String key) {
+    return binarySearchOnKeys(key) > -1;
   }
 
-  // region Interface Implementation
+  @Override
+  public boolean hasRecordAlias(String alias) {
+    return getIndexByAlias(alias) > -1;
+  }
+
+  @Override
+  public synchronized void syncBuffer() {
+    ByteBuffer prefBuffer = ByteBuffer.allocate(SEG_HEADER_SIZE);
+
+    ReadWriteIOUtils.write(length, prefBuffer);
+    ReadWriteIOUtils.write(freeAddr, prefBuffer);
+    ReadWriteIOUtils.write(recordNum, prefBuffer);
+    ReadWriteIOUtils.write(pairLength, prefBuffer);
+    ReadWriteIOUtils.write(prevSegAddress, prefBuffer);
+    ReadWriteIOUtils.write(nextSegAddress, prefBuffer);
+    ReadWriteIOUtils.write(getFlag(), prefBuffer);
+
+    prefBuffer.clear();
+    this.buffer.clear();
+    this.buffer.put(prefBuffer);
+  }
+
+  private byte getFlag() {
+    byte flags = delFlag ? (byte) 0x80 : (byte) 0x00;
+    flags = (byte) (aliasFlag ? flags | 0x40 : flags | 0x00);
+    return flags;
+  }
+
+  @Override
+  public short size() {
+    return length;
+  }
+
+  @Override
+  public short getSpareSize() {
+    return (short) (freeAddr - pairLength - SEG_HEADER_SIZE);
+  }
+
+  @Override
+  public void delete() {
+    this.delFlag = true;
+    this.buffer.clear();
+    this.buffer.position(SchemaFileConfig.SEG_HEADER_SIZE - 1);
+    ReadWriteIOUtils.write(getFlag(), this.buffer);
+  }
+
+  @Override
+  public long getNextSegAddress() {
+    return nextSegAddress;
+  }
+
+  @Override
+  public void setNextSegAddress(long nextSegAddress) {
+    this.nextSegAddress = nextSegAddress;
+  }
 
   @Override
   public synchronized int insertRecord(String key, ByteBuffer buf)
       throws RecordDuplicatedException {
     buf.clear();
 
-    int recordStartAddr = freeAddr - buf.capacity();
+    // key and body store adjacently
+    byte[] ikBytes = key.getBytes();
+    int recordStartAddr = freeAddr - buf.capacity() - 4 - ikBytes.length;
 
-    int newPairLength = pairLength + key.getBytes().length + 4 + 2;
+    int newPairLength = pairLength + 2;
     if (recordStartAddr < SchemaFileConfig.SEG_HEADER_SIZE + newPairLength) {
       return -1;
     }
     pairLength = (short) newPairLength;
 
-    int tarIdx = binaryInsertPairList(keyAddressList, key);
+    int tarIdx = binaryInsertOnKeys(key);
 
-    // update aliasKeyList
+    // fixme EXPENSIVE cross check of duplication between name and alias
+    if (aliasFlag && getIndexByAlias(key) != -1) {
+      throw new RecordDuplicatedException(
+          String.format("Record [%s] has conflict name with alias of its 
siblings.", key));
+    }
+
+    // check alias-key duplication, set flag if necessary
     String alias = RecordUtils.getRecordAlias(buf);
     if (alias != null && !alias.equals("")) {
-      aliasKeyList.add(binaryInsertPairList(aliasKeyList, alias), new 
Pair<>(alias, key));
+      if (binarySearchOnKeys(alias) >= 0 || getIndexByAlias(alias) != -1) {
+        throw new RecordDuplicatedException(
+            String.format("Record [%s] has conflict alias [%s] with its 
siblings.", key, alias));
+      }
+      aliasFlag = true;
     }
 
     buf.clear();
     this.buffer.clear();
     this.buffer.position(recordStartAddr);
+    ReadWriteIOUtils.write(key, this.buffer);
     this.buffer.put(buf);
-    keyAddressList.add(tarIdx, new Pair<>(key, (short) recordStartAddr));
+
+    this.buffer.clear().position(SchemaFileConfig.SEG_HEADER_SIZE + tarIdx * 
2);
+    int shiftOffsets = recordNum - tarIdx;
+    if (shiftOffsets > 0) {
+      short[] shifts = new short[shiftOffsets];
+      this.buffer.asShortBuffer().get(shifts);
+      this.buffer.position(SchemaFileConfig.SEG_HEADER_SIZE + tarIdx * 2);
+      this.buffer.putShort((short) recordStartAddr);
+      this.buffer.asShortBuffer().put(shifts);
+    } else {
+      this.buffer.putShort((short) recordStartAddr);
+    }
+
     this.freeAddr = (short) recordStartAddr;
     this.recordNum++;
 
@@ -158,64 +251,307 @@ public class WrappedSegment extends 
Segment<ICachedMNode> {
     return recordStartAddr - pairLength - SchemaFileConfig.SEG_HEADER_SIZE;
   }
 
+  private int getIndexByAlias(String target) {
+    ByteBuffer checkBuffer = this.buffer.asReadOnlyBuffer();
+    short[] offsets = new short[recordNum];
+
+    checkBuffer
+        .position(SchemaFileConfig.SEG_HEADER_SIZE)
+        .limit(SchemaFileConfig.SEG_HEADER_SIZE + pairLength);
+    checkBuffer.asShortBuffer().get(offsets);
+
+    checkBuffer.clear();
+    int keySize;
+    for (int i = 0; i < offsets.length; i++) {
+      checkBuffer.position(offsets[i]);
+      keySize = checkBuffer.getInt();
+      checkBuffer.position(checkBuffer.position() + keySize);
+      if (target.equals(RecordUtils.getRecordAlias(checkBuffer))) {
+        return i;
+      }
+    }
+    return -1;
+  }
+
+  @Override
+  public void extendsTo(ByteBuffer newBuffer) throws MetadataException {
+    short sizeGap = (short) (newBuffer.capacity() - length);
+
+    if (sizeGap < 0) {
+      throw new MetadataException("Leaf Segment cannot extend to a smaller 
buffer.");
+    }
+
+    this.buffer.clear();
+    newBuffer.clear();
+
+    if (sizeGap == 0) {
+      this.syncBuffer();
+      this.buffer.clear();
+      newBuffer.put(this.buffer);
+      this.buffer.clear();
+      newBuffer.clear();
+      return;
+    }
+
+    ReadWriteIOUtils.write((short) newBuffer.capacity(), newBuffer);
+    ReadWriteIOUtils.write((short) (freeAddr + sizeGap), newBuffer);
+    ReadWriteIOUtils.write(recordNum, newBuffer);
+    ReadWriteIOUtils.write(pairLength, newBuffer);
+    ReadWriteIOUtils.write(prevSegAddress, newBuffer);
+    ReadWriteIOUtils.write(nextSegAddress, newBuffer);
+    ReadWriteIOUtils.write(getFlag(), newBuffer);
+
+    newBuffer.position(SchemaFileConfig.SEG_HEADER_SIZE);
+    this.buffer.position(SchemaFileConfig.SEG_HEADER_SIZE);
+    for (int i = 0; i < recordNum; i++) {
+      newBuffer.putShort((short) (this.buffer.getShort() + sizeGap));
+    }
+
+    this.buffer.clear();
+    this.buffer.position(freeAddr);
+    this.buffer.limit(length);
+    newBuffer.position(freeAddr + sizeGap);
+    newBuffer.put(this.buffer);
+    newBuffer.clear();
+    this.buffer.clear();
+  }
+
+  @Override
+  public ByteBuffer resetBuffer(int ptr) {
+    freeAddr = (short) this.buffer.capacity();
+    recordNum = 0;
+    pairLength = 0;
+    prevSegAddress = -1;
+    nextSegAddress = -1;
+    delFlag = false;
+    aliasFlag = false;
+    syncBuffer();
+    this.buffer.clear();
+    return this.buffer.slice();
+  }
+
   @Override
   public synchronized String splitByKey(
       String key, ByteBuffer recBuf, ByteBuffer dstBuffer, boolean 
inclineSplit)
       throws MetadataException {
-    String sk = super.splitByKey(key, recBuf, dstBuffer, inclineSplit);
-    reconstructAliasAddressList();
-    return sk;
+
+    if (this.buffer.capacity() != dstBuffer.capacity()) {
+      throw new MetadataException("Segments only splits with same capacity.");
+    }
+
+    if (recordNum == 0) {
+      throw new MetadataException("Segment can not be split with no records.");
+    }
+
+    if (key == null && recordNum == 1) {
+      throw new MetadataException("Segment can not be split with only one 
record.");
+    }
+
+    // notice that key can be null here, and a null key means even split
+    boolean monotonic =
+        penuKey != null
+            && key != null
+            && lastKey != null
+            && inclineSplit
+            && (key.compareTo(lastKey)) * (lastKey.compareTo(penuKey)) > 0;
+
+    int n = recordNum;
+
+    // actual index of key just smaller than the insert, -2 for null key
+    int pos = key != null ? binaryInsertOnKeys(key) - 1 : -2;
+
+    int sp; // virtual index to split
+    if (monotonic) {
+      // new entry into part with more space
+      sp = key.compareTo(lastKey) > 0 ? Math.max(pos + 1, n / 2) : 
Math.min(pos + 2, n / 2);
+    } else {
+      sp = n / 2;
+    }
+
+    // little different from InternalSegment, only the front edge key can not 
split
+    sp = sp <= 0 ? 1 : sp;
+
+    short recordLeft = this.recordNum;
+    // prepare header for dstBuffer
+    short length = this.length,
+        freeAddr = (short) dstBuffer.capacity(),
+        recordNum = 0,
+        pairLength = 0;
+    long prevSegAddress = this.prevSegAddress, nextSegAddress = 
this.nextSegAddress;
+
+    int recSize, keySize;
+    String mKey, sKey = null;
+    ByteBuffer srcBuf;
+    int aix; // aix for actual index on keyAddressList
+    n = key == null ? n - 1 : n; // null key
+    // TODO: implement bulk split further
+    for (int ix = sp; ix <= n; ix++) {
+      if (ix == pos + 1) {
+        // migrate newly insert
+        srcBuf = recBuf;
+        recSize = recBuf.capacity();
+        mKey = key;
+        keySize = 4 + mKey.getBytes().length;
+
+        recBuf.clear();
+      } else {
+        srcBuf = this.buffer;
+        // pos equals -2 if key is null
+        aix = (ix > pos) && (pos != -2) ? ix - 1 : ix;
+        mKey = getKeyByIndex(aix);
+        keySize = 4 + mKey.getBytes().length;
+
+        // prepare on this.buffer
+        this.buffer.clear();
+        this.buffer.position(getOffsetByIndex(aix) + keySize);
+        recSize = getRecordLength();
+        this.buffer.limit(this.buffer.position() + recSize);
+
+        recordLeft--;
+      }
+
+      if (ix == sp) {
+        // search key is the first key in split segment
+        sKey = mKey;
+      }
+
+      freeAddr -= recSize + keySize;
+      dstBuffer.position(freeAddr);
+      ReadWriteIOUtils.write(mKey, dstBuffer);
+      dstBuffer.put(srcBuf);
+
+      dstBuffer.position(SchemaFileConfig.SEG_HEADER_SIZE + pairLength);
+      ReadWriteIOUtils.write(freeAddr, dstBuffer);
+
+      recordNum++;
+      pairLength += 2;
+    }
+
+    // compact and update status
+    this.recordNum = recordLeft;
+    compactRecords();
+    if (sp > pos + 1 && key != null) {
+      // new insert shall be in this
+      if (insertRecord(key, recBuf) < 0) {
+        throw new ColossalRecordException(key, recBuf.capacity());
+      }
+    }
+
+    // flush dstBuffer header
+    dstBuffer.clear();
+    ReadWriteIOUtils.write(length, dstBuffer);
+    ReadWriteIOUtils.write(freeAddr, dstBuffer);
+    ReadWriteIOUtils.write(recordNum, dstBuffer);
+    ReadWriteIOUtils.write(pairLength, dstBuffer);
+    ReadWriteIOUtils.write(prevSegAddress, dstBuffer);
+    ReadWriteIOUtils.write(nextSegAddress, dstBuffer);
+    // FIXME flag of split page is not always the same
+    ReadWriteIOUtils.write(getFlag(), dstBuffer);
+
+    penuKey = null;
+    lastKey = null;
+    return sKey;
+  }
+
+  protected void compactRecords() {
+    // compact by existed item on keyAddressList
+    ByteBuffer tempBuf = ByteBuffer.allocate(this.buffer.capacity() - 
this.freeAddr);
+    int accSiz = 0;
+    short[] newOffsets = new short[recordNum];
+
+    String migKey;
+    short migOffset;
+    int recLen;
+    for (int i = 0; i < recordNum; i++) {
+      migOffset = getOffsetByIndex(i);
+      migKey = getKeyByOffset(migOffset);
+      this.buffer.position(migOffset + 4 + migKey.getBytes().length);
+      recLen = getRecordLength();
+
+      this.buffer.clear();
+      this.buffer.position(migOffset);
+      this.buffer.limit(migOffset + 4 + migKey.getBytes().length + recLen);
+
+      accSiz += this.buffer.remaining();
+
+      newOffsets[i] = (short) (this.buffer.capacity() - accSiz);
+      tempBuf.position(tempBuf.capacity() - accSiz);
+      tempBuf.put(this.buffer);
+    }
+
+    tempBuf.clear();
+    tempBuf.position(tempBuf.capacity() - accSiz);
+    this.freeAddr = (short) (this.buffer.capacity() - accSiz);
+
+    this.buffer.clear();
+    this.buffer.position(SEG_HEADER_SIZE);
+    this.buffer.asShortBuffer().put(newOffsets);
+
+    this.buffer.position(this.freeAddr);
+    this.buffer.put(tempBuf);
+
+    this.syncBuffer();
+  }
+
+  private short getRecordLength() {
+    return RecordUtils.getRecordLength(this.buffer);
   }
 
   @Override
   public ICachedMNode getRecordByKey(String key) throws MetadataException {
     // index means order for target node in keyAddressList, NOT aliasKeyList
-    int index = getRecordIndexByKey(key);
+    int index = binarySearchOnKeys(key);
 
     if (index < 0) {
       return null;
     }
 
     ByteBuffer roBuffer = this.buffer.asReadOnlyBuffer(); // for concurrent 
read
-    short offset = getOffsetByKeyIndex(index);
+    short offset = getOffsetByIndex(index);
     roBuffer.clear();
-    roBuffer.position(offset);
+    roBuffer.position(offset + key.getBytes().length + 4);
     short len = RecordUtils.getRecordLength(roBuffer);
-    roBuffer.limit(offset + len);
+    roBuffer.limit(offset + key.getBytes().length + 4 + len);
 
-    return RecordUtils.buffer2Node(keyAddressList.get(index).left, roBuffer);
+    return RecordUtils.buffer2Node(key, roBuffer);
   }
 
   @Override
   public ICachedMNode getRecordByAlias(String alias) throws MetadataException {
-    int ix = getRecordIndexByAlias(alias);
+    int ix = getIndexByAlias(alias);
 
     if (ix < 0) {
       return null;
     }
 
     ByteBuffer rBuffer = this.buffer.asReadOnlyBuffer();
-    short offset = getOffsetByKeyIndex(ix);
-    rBuffer.clear().position(offset).limit(offset + 
RecordUtils.getRecordLength(rBuffer));
-    return RecordUtils.buffer2Node(keyAddressList.get(ix).left, rBuffer);
-  }
-
-  @Override
-  public boolean hasRecordAlias(String alias) {
-    return getRecordIndexByAlias(alias) > -1;
+    short offset = getOffsetByIndex(ix);
+    rBuffer.position(offset);
+    String key = ReadWriteIOUtils.readString(rBuffer);
+    rBuffer.limit(rBuffer.position() + RecordUtils.getRecordLength(rBuffer));
+    return RecordUtils.buffer2Node(key, rBuffer);
   }
 
   @Override
   public Queue<ICachedMNode> getAllRecords() throws MetadataException {
-    Queue<ICachedMNode> res = new ArrayDeque<>(keyAddressList.size());
+    Queue<ICachedMNode> res = new ArrayDeque<>(recordNum);
     ByteBuffer roBuffer = this.buffer.asReadOnlyBuffer();
+
+    short[] offsets = new short[recordNum];
+    roBuffer.position(SchemaFileConfig.SEG_HEADER_SIZE);
+    roBuffer.asShortBuffer().get(offsets);
+
     roBuffer.clear();
-    for (Pair<String, Short> p : keyAddressList) {
-      roBuffer.limit(roBuffer.capacity());
-      roBuffer.position(p.right);
-      short len = RecordUtils.getRecordLength(roBuffer);
-      roBuffer.limit(p.right + len);
-      res.add(RecordUtils.buffer2Node(p.left, roBuffer));
+
+    short len;
+    String key;
+    for (short offset : offsets) {
+      roBuffer.clear();
+      roBuffer.position(offset);
+      key = ReadWriteIOUtils.readString(roBuffer);
+      len = RecordUtils.getRecordLength(roBuffer);
+      roBuffer.limit(roBuffer.position() + len);
+      res.add(RecordUtils.buffer2Node(key, roBuffer));
     }
     return res;
   }
@@ -224,16 +560,14 @@ public class WrappedSegment extends Segment<ICachedMNode> 
{
   public int updateRecord(String key, ByteBuffer uBuffer)
       throws SegmentOverflowException, RecordDuplicatedException {
 
-    int idx = getRecordIndexByKey(key);
+    int idx = binarySearchOnKeys(key);
     if (idx < 0) {
       return -1;
     }
 
     this.buffer.clear();
     uBuffer.clear();
-    this.buffer.position(keyAddressList.get(idx).right);
-
-    String oriAlias = RecordUtils.getRecordAlias(this.buffer);
+    this.buffer.position(getOffsetByIndex(idx) + 4 + key.getBytes().length);
 
     short oriLen = RecordUtils.getRecordLength(this.buffer);
     short newLen = (short) uBuffer.capacity();
@@ -242,28 +576,21 @@ public class WrappedSegment extends Segment<ICachedMNode> 
{
       this.buffer.limit(this.buffer.position() + oriLen);
       this.buffer.put(uBuffer);
     } else {
-      // allocate new space for record, modify key-address list, freeAddr
-      if (SchemaFileConfig.SEG_HEADER_SIZE + pairLength + newLen > freeAddr) {
+      // allocate new space for record, update offset array, freeAddr
+      if (SchemaFileConfig.SEG_HEADER_SIZE + pairLength + newLen + 4 + 
key.getBytes().length
+          > freeAddr) {
         // not enough space
         throw new SegmentOverflowException(idx);
       }
 
-      freeAddr = (short) (freeAddr - newLen);
+      freeAddr = (short) (freeAddr - newLen - 4 - key.getBytes().length);
       // it will not mark old record as expired
       this.buffer.position(freeAddr);
-      this.buffer.limit(freeAddr + newLen);
-      keyAddressList.get(idx).right = freeAddr;
+      ReadWriteIOUtils.write(key, this.buffer);
       this.buffer.put(uBuffer);
-    }
 
-    // update alias-key list accordingly
-    if (oriAlias != null) {
-      aliasKeyList.remove(binarySearchPairList(aliasKeyList, oriAlias));
-    }
-    uBuffer.clear();
-    String alias = RecordUtils.getRecordAlias(uBuffer);
-    if (alias != null) {
-      aliasKeyList.add(binaryInsertPairList(aliasKeyList, alias), new 
Pair<>(alias, key));
+      this.buffer.position(SchemaFileConfig.SEG_HEADER_SIZE + 2 * idx);
+      this.buffer.putShort(freeAddr);
     }
 
     return idx;
@@ -271,85 +598,177 @@ public class WrappedSegment extends 
Segment<ICachedMNode> {
 
   @Override
   public int removeRecord(String key) {
-    int idx = getRecordIndexByKey(key);
+    int idx = binarySearchOnKeys(key);
 
     // deletion only seeks for name of ICacheMNode
     if (idx < 0) {
       return -1;
     }
 
-    this.buffer.clear();
-    this.buffer.position(keyAddressList.get(idx).right);
-
-    // free address pointer forwards if last record removed
-    if (keyAddressList.get(idx).right == freeAddr) {
+    // restore space immediately if the last record removed
+    short offset = getOffsetByIndex(idx);
+    if (offset == freeAddr) {
+      this.buffer.position(offset + 4 + key.getBytes().length);
       short len = RecordUtils.getRecordLength(this.buffer);
-      freeAddr += len;
+      freeAddr += len + 4 + key.getBytes().length;
     }
 
-    // update alias-key list accordingly
-    String alias = RecordUtils.getRecordAlias(this.buffer);
-    if (alias != null && !alias.equals("")) {
-      aliasKeyList.remove(binarySearchPairList(aliasKeyList, alias));
+    // shift offsets forward
+    if (idx != recordNum) {
+      int shift = recordNum - idx;
+      this.buffer.position(SchemaFileConfig.SEG_HEADER_SIZE + idx * 2);
+      ShortBuffer lb = this.buffer.asReadOnlyBuffer().asShortBuffer();
+      lb.get();
+      while (shift != 0) {
+        this.buffer.putShort(lb.get());
+        shift--;
+      }
     }
 
-    // TODO: compact segment further as well
     recordNum--;
     pairLength -= 2;
-    pairLength -= (key.getBytes().length + 4);
-    keyAddressList.remove(idx);
 
     return idx;
   }
 
-  @Override
-  protected short getRecordLength() {
-    return RecordUtils.getRecordLength(this.buffer);
-  }
   // endregion
 
   // region Segment & Record Buffer Operation
 
   protected void updateRecordSegAddr(String key, long newSegAddr) {
-    int index = getRecordIndexByKey(key);
-    short offset = getOffsetByKeyIndex(index);
+    int index = binarySearchOnKeys(key);
+    short offset = getOffsetByIndex(index);
 
     this.buffer.clear();
-    this.buffer.position(offset);
+    this.buffer.position(offset + 4 + key.getBytes().length);
     RecordUtils.updateSegAddr(this.buffer, newSegAddr);
   }
 
   // endregion
 
-  // region Getters of Record Index or Offset
-  /**
-   * To decouple search implementation from other methods Rather than offset 
of the target key,
-   * index could be used to update or remove on keyAddressList
-   *
-   * @param key Record Key
-   * @return index of record, -1 for not found
-   */
-  private int getRecordIndexByKey(String key) {
-    return binarySearchPairList(keyAddressList, key);
+  // region Record Index Access
+  // todo abstract with same name method within Internal
+
+  protected short getOffsetByIndex(int index) {
+    if (index < 0 || index >= recordNum) {
+      throw new IndexOutOfBoundsException();
+    }
+    synchronized (this.buffer) {
+      this.buffer.limit(this.buffer.capacity());
+      this.buffer.position(SEG_HEADER_SIZE + index * 
SchemaFileConfig.SEG_OFF_DIG);
+      return ReadWriteIOUtils.readShort(this.buffer);
+    }
+  }
+
+  protected String getKeyByOffset(short offset) {
+    synchronized (this.buffer) {
+      this.buffer.limit(this.buffer.capacity());
+      this.buffer.position(offset);
+      return ReadWriteIOUtils.readString(this.buffer);
+    }
+  }
+
+  private String getKeyByIndex(int index) {
+    if (index < 0 || index >= recordNum) {
+      throw new IndexOutOfBoundsException();
+    }
+    synchronized (this.buffer) {
+      this.buffer
+          .limit(this.buffer.capacity())
+          .position(SEG_HEADER_SIZE + index * SchemaFileConfig.SEG_OFF_DIG);
+      this.buffer.position(ReadWriteIOUtils.readShort(this.buffer));
+      return ReadWriteIOUtils.readString(this.buffer);
+    }
   }
 
   /**
-   * Notice it figures index of the record within {@link #keyAddressList} 
whose alias is target
-   * parameter.
-   *
-   * @param alias
-   * @return index of the record within {@link #keyAddressList}
+   * @param key
+   * @return -1 if not existed, otherwise correspondent position of target 
offset
    */
-  private int getRecordIndexByAlias(String alias) {
-    int aliasIndex = binarySearchPairList(aliasKeyList, alias);
-    if (aliasIndex < 0) {
+  protected int binarySearchOnKeys(String key) {
+    int head = 0;
+    int tail = recordNum - 1;
+    if (tail < 0
+        || key.compareTo(getKeyByIndex(head)) < 0
+        || key.compareTo(getKeyByIndex(tail)) > 0) {
       return -1;
     }
-    return binarySearchPairList(keyAddressList, 
aliasKeyList.get(aliasIndex).right);
+
+    if (key.compareTo(getKeyByIndex(head)) == 0) {
+      return head;
+    }
+    if (key.compareTo(getKeyByIndex(tail)) == 0) {
+      return tail;
+    }
+
+    int pivot = (head + tail) / 2;
+    while (key.compareTo(getKeyByIndex(pivot)) != 0) {
+      if (head == tail || pivot == head || pivot == tail) {
+        return -1;
+      }
+      if (key.compareTo(getKeyByIndex(pivot)) < 0) {
+        tail = pivot;
+      } else if (key.compareTo(getKeyByIndex(pivot)) > 0) {
+        head = pivot;
+      }
+      pivot = (head + tail) / 2;
+    }
+    return pivot;
   }
 
-  private short getOffsetByKeyIndex(int index) {
-    return keyAddressList.get(index).right;
+  protected int binaryInsertOnKeys(String key) throws 
RecordDuplicatedException {
+    if (recordNum == 0) {
+      return 0;
+    }
+
+    int tarIdx = 0;
+    int head = 0;
+    int tail = recordNum - 1;
+
+    if (getKeyByIndex(head).compareTo(key) == 0 || 
getKeyByIndex(tail).compareTo(key) == 0) {
+      throw new RecordDuplicatedException(key);
+    }
+
+    if (key.compareTo(getKeyByIndex(head)) < 0) {
+      return 0;
+    }
+
+    if (key.compareTo(getKeyByIndex(tail)) > 0) {
+      return recordNum;
+    }
+
+    int pivot;
+    while (head != tail) {
+      pivot = (head + tail) / 2;
+      // notice pivot always smaller than list.size()-1
+      if (getKeyByIndex(pivot).compareTo(key) == 0
+          || getKeyByIndex(pivot + 1).compareTo(key) == 0) {
+        throw new RecordDuplicatedException(key);
+      }
+
+      if (getKeyByIndex(pivot).compareTo(key) < 0 && getKeyByIndex(pivot + 
1).compareTo(key) > 0) {
+        return pivot + 1;
+      }
+
+      if (pivot == head || pivot == tail) {
+        if (getKeyByIndex(head).compareTo(key) > 0) {
+          return head;
+        }
+        if (getKeyByIndex(tail).compareTo(key) < 0) {
+          return tail + 1;
+        }
+      }
+
+      // impossible for pivot.cmp > 0 and (pivot+1).cmp < 0
+      if (getKeyByIndex(pivot).compareTo(key) > 0) {
+        tail = pivot;
+      }
+
+      if (getKeyByIndex(pivot + 1).compareTo(key) < 0) {
+        head = pivot;
+      }
+    }
+    return tarIdx;
   }
 
   // endregion
@@ -358,6 +777,7 @@ public class WrappedSegment extends Segment<ICachedMNode> {
   public String toString() {
     ByteBuffer bufferR = this.buffer.asReadOnlyBuffer();
     StringBuilder builder = new StringBuilder("");
+    List<Pair<String, Short>> keyAddressList = getKeyOffsetList();
     builder.append(
         String.format(
             "[size: %d, K-AL size: %d, spare:%d,",
@@ -366,7 +786,7 @@ public class WrappedSegment extends Segment<ICachedMNode> {
             freeAddr - pairLength - SchemaFileConfig.SEG_HEADER_SIZE));
     bufferR.clear();
     for (Pair<String, Short> pair : keyAddressList) {
-      bufferR.position(pair.right);
+      bufferR.position(pair.right + 4 + pair.left.getBytes().length);
       if (RecordUtils.getRecordType(bufferR) == 0 || 
RecordUtils.getRecordType(bufferR) == 1) {
         builder.append(
             String.format(
@@ -391,6 +811,8 @@ public class WrappedSegment extends Segment<ICachedMNode> {
       return "";
     }
 
+    List<Pair<String, Short>> keyAddressList = getKeyOffsetList();
+
     // Internal/Entity presents as (name, is_aligned, child_segment_address)
     // Measurement presents as (name, data_type, encoding, compressor, 
alias_if_exist)
     ByteBuffer bufferR = this.buffer.asReadOnlyBuffer();
@@ -403,7 +825,7 @@ public class WrappedSegment extends Segment<ICachedMNode> {
             freeAddr - pairLength - SchemaFileConfig.SEG_HEADER_SIZE));
     bufferR.clear();
     for (Pair<String, Short> pair : keyAddressList) {
-      bufferR.position(pair.right);
+      bufferR.position(pair.right + pair.left.getBytes().length + 4);
       if (RecordUtils.getRecordType(bufferR) == 0 || 
RecordUtils.getRecordType(bufferR) == 1) {
         builder.append(
             String.format(
@@ -457,13 +879,14 @@ public class WrappedSegment extends Segment<ICachedMNode> 
{
   @TestOnly
   public ByteBuffer getRecord(String key) {
     short targetAddr;
-    int idx = getRecordIndexByKey(key);
+    int idx = binarySearchOnKeys(key);
     if (idx >= 0) {
-      targetAddr = keyAddressList.get(idx).right;
+      targetAddr = getOffsetByIndex(idx);
       this.buffer.clear();
       this.buffer.position(targetAddr);
+      this.buffer.position(targetAddr + 4 + this.buffer.getInt());
       short len = RecordUtils.getRecordLength(this.buffer);
-      this.buffer.limit(targetAddr + len);
+      this.buffer.limit(this.buffer.position() + len);
       return this.buffer.slice();
     }
     return null;
@@ -471,7 +894,26 @@ public class WrappedSegment extends Segment<ICachedMNode> {
 
   @TestOnly
   public List<Pair<String, Short>> getKeyOffsetList() {
-    return keyAddressList;
+    short[] offsets = getOffsets();
+    List<Pair<String, Short>> res = new ArrayList<>();
+
+    ByteBuffer buffer = this.buffer.asReadOnlyBuffer();
+    buffer.clear();
+    for (short offset : offsets) {
+      buffer.position(offset);
+      res.add(new Pair<>(ReadWriteIOUtils.readString(buffer), offset));
+    }
+
+    return res;
+  }
+
+  @TestOnly
+  public short[] getOffsets() {
+    ByteBuffer buf = this.buffer.asReadOnlyBuffer();
+    short[] ofs = new short[recordNum];
+    buf.clear().position(SEG_HEADER_SIZE);
+    buf.asShortBuffer().get(ofs);
+    return ofs;
   }
 
   // endregion

Reply via email to