This is an automated email from the ASF dual-hosted git repository.

chenyz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new f1d718666f1 PBTree: Fix Dead Lock and Refactor write/update inteface 
(#11985)
f1d718666f1 is described below

commit f1d718666f100aea13f977cd019ecc7b3b3b894b
Author: ZhaoXin <[email protected]>
AuthorDate: Sat Feb 3 21:17:08 2024 +0800

    PBTree: Fix Dead Lock and Refactor write/update inteface (#11985)
    
    * fix dead lock
    
    * fix uesless code
---
 .../impl/pbtree/schemafile/AliasIndexPage.java     |  3 +-
 .../mtree/impl/pbtree/schemafile/ISegment.java     |  7 ++--
 .../impl/pbtree/schemafile/ISegmentedPage.java     | 11 +++---
 .../mtree/impl/pbtree/schemafile/InternalPage.java |  3 +-
 .../impl/pbtree/schemafile/SegmentedPage.java      | 43 +++++++++++----------
 .../impl/pbtree/schemafile/WrappedSegment.java     | 10 ++---
 .../pbtree/schemafile/pagemgr/PageManager.java     | 45 ++++++++++++++++------
 7 files changed, 71 insertions(+), 51 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/AliasIndexPage.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/AliasIndexPage.java
index 47222f2f656..0b2b9ace201 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/AliasIndexPage.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/AliasIndexPage.java
@@ -124,8 +124,7 @@ public class AliasIndexPage extends SchemaPage implements 
ISegment<String, Strin
   }
 
   @Override
-  public int updateRecord(String key, String buffer)
-      throws SegmentOverflowException, RecordDuplicatedException {
+  public int updateRecord(String key, String buffer) throws MetadataException {
     return -1;
   }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/ISegment.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/ISegment.java
index c95a28bb4f1..e8915d68f68 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/ISegment.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/ISegment.java
@@ -20,7 +20,6 @@ package 
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.schemafi
 
 import org.apache.iotdb.commons.exception.MetadataException;
 import 
org.apache.iotdb.db.exception.metadata.schemafile.RecordDuplicatedException;
-import 
org.apache.iotdb.db.exception.metadata.schemafile.SegmentOverflowException;
 
 import java.nio.ByteBuffer;
 import java.util.Queue;
@@ -45,10 +44,10 @@ public interface ISegment<T, R> {
    *
    * @param key name of the record, not the alias
    * @param buffer content of the updated record
-   * @return index of keyAddressList, -1 for not found, exception for space 
run out
-   * @throws SegmentOverflowException if segment runs out of memory
+   * @return -1 for space not enough
+   * @throws for key not found
    */
-  int updateRecord(String key, T buffer) throws SegmentOverflowException, 
RecordDuplicatedException;
+  int updateRecord(String key, T buffer) throws MetadataException;
 
   int removeRecord(String key);
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/ISegmentedPage.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/ISegmentedPage.java
index 3e0f44db0e1..6aaac8a1abc 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/ISegmentedPage.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/ISegmentedPage.java
@@ -29,13 +29,12 @@ import java.util.Queue;
 
 /** Interface for a {@link SchemaPage} manages one or more {@link 
WrappedSegment}. */
 public interface ISegmentedPage extends ISchemaPage {
-
   /**
    * Insert a content directly into specified segment. If not enough spare 
within the segment,
-   * reallocate inside the page, or throw exception for new page then.
+   * reallocate inside the page, return negative if not spare enough yet.
    *
-   * @return return 0 if write succeed
-   * @throws SchemaPageOverflowException no spare space inside page
+   * @return 0 for success, positive for page spare increment, negative for 
space run out
+   * @throws MetadataException no spare
    */
   long write(short segIdx, String key, ByteBuffer buffer) throws 
MetadataException;
 
@@ -49,8 +48,10 @@ public interface ISegmentedPage extends ISchemaPage {
    *
    * <p>If segment not enough, it will reallocate in this page first, and 
throw {@link
    * SchemaPageOverflowException} if no more spare space to reallocate.
+   *
+   * @return same as that of {{@link #write}}
    */
-  void update(short segIdx, String key, ByteBuffer buffer) throws 
MetadataException;
+  long update(short segIdx, String key, ByteBuffer buffer) throws 
MetadataException;
 
   Queue<ICachedMNode> getChildren(short segId) throws MetadataException;
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/InternalPage.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/InternalPage.java
index f1c2d252f84..1466c1a94df 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/InternalPage.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/InternalPage.java
@@ -140,8 +140,7 @@ public class InternalPage extends SchemaPage implements 
ISegment<Integer, Intege
   }
 
   @Override
-  public int updateRecord(String key, Integer buffer)
-      throws SegmentOverflowException, RecordDuplicatedException {
+  public int updateRecord(String key, Integer buffer) throws MetadataException 
{
     return 0;
   }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/SegmentedPage.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/SegmentedPage.java
index b5de61e9da6..80db9c08821 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/SegmentedPage.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/SegmentedPage.java
@@ -23,7 +23,6 @@ import org.apache.iotdb.commons.utils.TestOnly;
 import 
org.apache.iotdb.db.exception.metadata.schemafile.RecordDuplicatedException;
 import 
org.apache.iotdb.db.exception.metadata.schemafile.SchemaPageOverflowException;
 import 
org.apache.iotdb.db.exception.metadata.schemafile.SegmentNotFoundException;
-import 
org.apache.iotdb.db.exception.metadata.schemafile.SegmentOverflowException;
 import 
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.mnode.ICachedMNode;
 import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
 
@@ -99,18 +98,22 @@ public class SegmentedPage extends SchemaPage implements 
ISegmentedPage {
     ISegment<ByteBuffer, ICachedMNode> tarSeg = getSegment(segIdx);
 
     if (tarSeg.insertRecord(key, buffer) < 0) {
+      short spare = spareSize;
       // relocate inside page, if not enough space for new size segment, throw 
exception
       tarSeg =
           relocateSegment(
               tarSeg, segIdx, SchemaFile.reEstimateSegSize(tarSeg.size() + 
buffer.capacity()));
-    } else {
-      return 0L;
-    }
+      if (tarSeg == null) {
+        return -1;
+      }
 
-    if (tarSeg.insertRecord(key, buffer) < 0) {
-      throw new MetadataException("failed to insert buffer into new segment");
-    }
+      // relocated but still not enough
+      if (tarSeg.insertRecord(key, buffer) < 0) {
+        throw new MetadataException("failed to insert buffer into relocated 
segment");
+      }
 
+      return spare >= spareSize ? 0L : spareSize - spare;
+    }
     return 0L;
   }
 
@@ -125,24 +128,25 @@ public class SegmentedPage extends SchemaPage implements 
ISegmentedPage {
   }
 
   @Override
-  public void update(short segIdx, String key, ByteBuffer buffer) throws 
MetadataException {
+  public long update(short segIdx, String key, ByteBuffer buffer) throws 
MetadataException {
     ISegment<ByteBuffer, ICachedMNode> seg = getSegment(segIdx);
-    try {
-      if (seg.updateRecord(key, buffer) < 0) {
-        throw new MetadataException("Record to update not found.");
-      }
-    } catch (SegmentOverflowException e) {
-      // relocate large enough to include buffer, throw up if page overflow
+
+    if (seg.updateRecord(key, buffer) < 0) {
+      short spare = spareSize;
       seg =
           relocateSegment(
               seg, segIdx, SchemaFile.reEstimateSegSize(seg.size() + 
buffer.capacity()));
 
-      // retry and throw if failed again
+      if (seg == null) {
+        return -1;
+      }
+
       if (seg.updateRecord(key, buffer) < 0) {
-        throw new MetadataException(
-            String.format("Unknown reason for key [%s] not found in page 
[%d].", key, pageIndex));
+        throw new MetadataException("failed to update buffer upon relocated 
segment");
       }
+      return spare >= spareSize ? 0 : spareSize - spare;
     }
+    return 0L;
   }
 
   @Override
@@ -401,13 +405,12 @@ public class SegmentedPage extends SchemaPage implements 
ISegmentedPage {
    * @param seg original segment instance
    * @param segIdx original segment index
    * @param newSize target segment size
-   * @return reallocated segment instance
-   * @throws SchemaPageOverflowException if this page has no enough space
+   * @return reallocated segment instance, null if no enough spare space
    */
   private ISegment<ByteBuffer, ICachedMNode> relocateSegment(
       ISegment<?, ?> seg, short segIdx, short newSize) throws 
MetadataException {
     if (seg.size() == SchemaFileConfig.SEG_MAX_SIZ || getSpareSize() + 
seg.size() < newSize) {
-      throw new SchemaPageOverflowException(pageIndex);
+      return null;
     }
 
     // try to allocate space directly from spareOffset or rearrange and extend 
in place
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/WrappedSegment.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/WrappedSegment.java
index ce1065234d4..cc43f619707 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/WrappedSegment.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/WrappedSegment.java
@@ -23,7 +23,6 @@ import 
org.apache.iotdb.commons.schema.view.viewExpression.ViewExpression;
 import org.apache.iotdb.commons.utils.TestOnly;
 import 
org.apache.iotdb.db.exception.metadata.schemafile.ColossalRecordException;
 import 
org.apache.iotdb.db.exception.metadata.schemafile.RecordDuplicatedException;
-import 
org.apache.iotdb.db.exception.metadata.schemafile.SegmentOverflowException;
 import 
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.mnode.ICachedMNode;
 import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
@@ -558,12 +557,11 @@ public class WrappedSegment implements 
ISegment<ByteBuffer, ICachedMNode> {
   }
 
   @Override
-  public int updateRecord(String key, ByteBuffer uBuffer)
-      throws SegmentOverflowException, RecordDuplicatedException {
+  public int updateRecord(String key, ByteBuffer uBuffer) throws 
MetadataException {
 
     int idx = binarySearchOnKeys(key);
     if (idx < 0) {
-      return -1;
+      throw new MetadataException(String.format("Record[key:%s] Not Existed.", 
key));
     }
 
     this.buffer.clear();
@@ -580,8 +578,8 @@ public class WrappedSegment implements ISegment<ByteBuffer, 
ICachedMNode> {
       // allocate new space for record, update offset array, freeAddr
       if (SchemaFileConfig.SEG_HEADER_SIZE + pairLength + newLen + 4 + 
key.getBytes().length
           > freeAddr) {
-        // not enough space
-        throw new SegmentOverflowException(idx);
+        // no enough consecutive spare space
+        return -1;
       }
 
       freeAddr = (short) (freeAddr - newLen - 4 - key.getBytes().length);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/pagemgr/PageManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/pagemgr/PageManager.java
index ad92a332d57..95a47533ca8 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/pagemgr/PageManager.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/pagemgr/PageManager.java
@@ -172,6 +172,7 @@ public abstract class PageManager implements IPageManager {
     int subIndex;
     long curSegAddr = getNodeAddress(node);
     long actualAddress; // actual segment to write record
+    long res; // result of write
     ICachedMNode child;
     ISchemaPage curPage;
     ByteBuffer childBuffer;
@@ -207,10 +208,16 @@ public abstract class PageManager implements IPageManager 
{
       actualAddress = getTargetSegmentAddress(curSegAddr, entry.getKey(), cxt);
       curPage = getPageInstance(SchemaFile.getPageIndex(actualAddress), cxt);
 
-      try {
-        curPage
-            .getAsSegmentedPage()
-            .write(SchemaFile.getSegIndex(actualAddress), entry.getKey(), 
childBuffer);
+      res =
+          curPage
+              .getAsSegmentedPage()
+              .write(SchemaFile.getSegIndex(actualAddress), entry.getKey(), 
childBuffer);
+      if (res >= 0) {
+        if (res != 0) {
+          // spare size increased, buckets need to update
+          cxt.indexBuckets.sortIntoBucket(curPage, (short) -1);
+        }
+
         interleavedFlush(curPage, cxt);
         cxt.markDirty(curPage);
 
@@ -218,8 +225,9 @@ public abstract class PageManager implements IPageManager {
         if (alias != null && subIndex >= 0) {
           insertSubIndexEntry(subIndex, alias, entry.getKey(), cxt);
         }
-
-      } catch (SchemaPageOverflowException e) {
+      } else {
+        // page overflow
+        // current page is not enough for coming record
         if 
(curPage.getAsSegmentedPage().getSegmentSize(SchemaFile.getSegIndex(actualAddress))
             == SchemaFileConfig.SEG_MAX_SIZ) {
           // curPage might be replaced so unnecessary to mark it here
@@ -273,6 +281,7 @@ public abstract class PageManager implements IPageManager {
     ICachedMNode child, oldChild;
     ISchemaPage curPage;
     ByteBuffer childBuffer;
+    long res; // result of update
     for (Map.Entry<String, ICachedMNode> entry :
         
ICachedMNodeContainer.getCachedMNodeContainer(node).getUpdatedChildBuffer().entrySet())
 {
       child = entry.getValue();
@@ -319,10 +328,15 @@ public abstract class PageManager implements IPageManager 
{
         insertNewSubEntry = removeOldSubEntry = false;
       }
 
-      try {
-        curPage
-            .getAsSegmentedPage()
-            .update(SchemaFile.getSegIndex(actualAddress), entry.getKey(), 
childBuffer);
+      res =
+          curPage
+              .getAsSegmentedPage()
+              .update(SchemaFile.getSegIndex(actualAddress), entry.getKey(), 
childBuffer);
+      if (res >= 0) {
+        if (res != 0) {
+          // spare size increased, buckets need to update
+          cxt.indexBuckets.sortIntoBucket(curPage, (short) -1);
+        }
         interleavedFlush(curPage, cxt);
         cxt.markDirty(curPage);
 
@@ -336,7 +350,8 @@ public abstract class PageManager implements IPageManager {
             insertSubIndexEntry(subIndex, alias, entry.getKey(), cxt);
           }
         }
-      } catch (SchemaPageOverflowException e) {
+      } else {
+        // page overflow
         if 
(curPage.getAsSegmentedPage().getSegmentSize(SchemaFile.getSegIndex(actualAddress))
             == SchemaFileConfig.SEG_MAX_SIZ) {
           multiPageUpdateOverflowOperation(curPage, entry.getKey(), 
childBuffer, cxt);
@@ -544,7 +559,13 @@ public abstract class PageManager implements IPageManager {
   private long preAllocateSegment(short size, SchemaPageContext cxt)
       throws IOException, MetadataException {
     ISegmentedPage page = getMinApplSegmentedPageInMem(size, cxt);
-    return SchemaFile.getGlobalIndex(page.getPageIndex(), 
page.allocNewSegment(size));
+    short sparePrev = page.getSpareSize();
+    long res = SchemaFile.getGlobalIndex(page.getPageIndex(), 
page.allocNewSegment(size));
+    if (sparePrev < page.getSpareSize()) {
+      // a compaction trigger by allocNewSegment had increased spare size
+      cxt.indexBuckets.sortIntoBucket(page, (short) -1);
+    }
+    return res;
   }
 
   protected ISchemaPage replacePageInCache(ISchemaPage page, SchemaPageContext 
cxt) {

Reply via email to