This is an automated email from the ASF dual-hosted git repository.
chenyz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new f1d718666f1 PBTree: Fix Dead Lock and Refactor write/update inteface
(#11985)
f1d718666f1 is described below
commit f1d718666f100aea13f977cd019ecc7b3b3b894b
Author: ZhaoXin <[email protected]>
AuthorDate: Sat Feb 3 21:17:08 2024 +0800
PBTree: Fix Dead Lock and Refactor write/update inteface (#11985)
* fix dead lock
* fix uesless code
---
.../impl/pbtree/schemafile/AliasIndexPage.java | 3 +-
.../mtree/impl/pbtree/schemafile/ISegment.java | 7 ++--
.../impl/pbtree/schemafile/ISegmentedPage.java | 11 +++---
.../mtree/impl/pbtree/schemafile/InternalPage.java | 3 +-
.../impl/pbtree/schemafile/SegmentedPage.java | 43 +++++++++++----------
.../impl/pbtree/schemafile/WrappedSegment.java | 10 ++---
.../pbtree/schemafile/pagemgr/PageManager.java | 45 ++++++++++++++++------
7 files changed, 71 insertions(+), 51 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/AliasIndexPage.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/AliasIndexPage.java
index 47222f2f656..0b2b9ace201 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/AliasIndexPage.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/AliasIndexPage.java
@@ -124,8 +124,7 @@ public class AliasIndexPage extends SchemaPage implements
ISegment<String, Strin
}
@Override
- public int updateRecord(String key, String buffer)
- throws SegmentOverflowException, RecordDuplicatedException {
+ public int updateRecord(String key, String buffer) throws MetadataException {
return -1;
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/ISegment.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/ISegment.java
index c95a28bb4f1..e8915d68f68 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/ISegment.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/ISegment.java
@@ -20,7 +20,6 @@ package
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.schemafi
import org.apache.iotdb.commons.exception.MetadataException;
import
org.apache.iotdb.db.exception.metadata.schemafile.RecordDuplicatedException;
-import
org.apache.iotdb.db.exception.metadata.schemafile.SegmentOverflowException;
import java.nio.ByteBuffer;
import java.util.Queue;
@@ -45,10 +44,10 @@ public interface ISegment<T, R> {
*
* @param key name of the record, not the alias
* @param buffer content of the updated record
- * @return index of keyAddressList, -1 for not found, exception for space
run out
- * @throws SegmentOverflowException if segment runs out of memory
+ * @return -1 for space not enough
+ * @throws for key not found
*/
- int updateRecord(String key, T buffer) throws SegmentOverflowException,
RecordDuplicatedException;
+ int updateRecord(String key, T buffer) throws MetadataException;
int removeRecord(String key);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/ISegmentedPage.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/ISegmentedPage.java
index 3e0f44db0e1..6aaac8a1abc 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/ISegmentedPage.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/ISegmentedPage.java
@@ -29,13 +29,12 @@ import java.util.Queue;
/** Interface for a {@link SchemaPage} manages one or more {@link
WrappedSegment}. */
public interface ISegmentedPage extends ISchemaPage {
-
/**
* Insert a content directly into specified segment. If not enough spare
within the segment,
- * reallocate inside the page, or throw exception for new page then.
+ * reallocate inside the page, return negative if not spare enough yet.
*
- * @return return 0 if write succeed
- * @throws SchemaPageOverflowException no spare space inside page
+ * @return 0 for success, positive for page spare increment, negative for
space run out
+ * @throws MetadataException no spare
*/
long write(short segIdx, String key, ByteBuffer buffer) throws
MetadataException;
@@ -49,8 +48,10 @@ public interface ISegmentedPage extends ISchemaPage {
*
* <p>If segment not enough, it will reallocate in this page first, and
throw {@link
* SchemaPageOverflowException} if no more spare space to reallocate.
+ *
+ * @return same as that of {{@link #write}}
*/
- void update(short segIdx, String key, ByteBuffer buffer) throws
MetadataException;
+ long update(short segIdx, String key, ByteBuffer buffer) throws
MetadataException;
Queue<ICachedMNode> getChildren(short segId) throws MetadataException;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/InternalPage.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/InternalPage.java
index f1c2d252f84..1466c1a94df 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/InternalPage.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/InternalPage.java
@@ -140,8 +140,7 @@ public class InternalPage extends SchemaPage implements
ISegment<Integer, Intege
}
@Override
- public int updateRecord(String key, Integer buffer)
- throws SegmentOverflowException, RecordDuplicatedException {
+ public int updateRecord(String key, Integer buffer) throws MetadataException
{
return 0;
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/SegmentedPage.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/SegmentedPage.java
index b5de61e9da6..80db9c08821 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/SegmentedPage.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/SegmentedPage.java
@@ -23,7 +23,6 @@ import org.apache.iotdb.commons.utils.TestOnly;
import
org.apache.iotdb.db.exception.metadata.schemafile.RecordDuplicatedException;
import
org.apache.iotdb.db.exception.metadata.schemafile.SchemaPageOverflowException;
import
org.apache.iotdb.db.exception.metadata.schemafile.SegmentNotFoundException;
-import
org.apache.iotdb.db.exception.metadata.schemafile.SegmentOverflowException;
import
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.mnode.ICachedMNode;
import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
@@ -99,18 +98,22 @@ public class SegmentedPage extends SchemaPage implements
ISegmentedPage {
ISegment<ByteBuffer, ICachedMNode> tarSeg = getSegment(segIdx);
if (tarSeg.insertRecord(key, buffer) < 0) {
+ short spare = spareSize;
// relocate inside page, if not enough space for new size segment, throw
exception
tarSeg =
relocateSegment(
tarSeg, segIdx, SchemaFile.reEstimateSegSize(tarSeg.size() +
buffer.capacity()));
- } else {
- return 0L;
- }
+ if (tarSeg == null) {
+ return -1;
+ }
- if (tarSeg.insertRecord(key, buffer) < 0) {
- throw new MetadataException("failed to insert buffer into new segment");
- }
+ // relocated but still not enough
+ if (tarSeg.insertRecord(key, buffer) < 0) {
+ throw new MetadataException("failed to insert buffer into relocated
segment");
+ }
+ return spare >= spareSize ? 0L : spareSize - spare;
+ }
return 0L;
}
@@ -125,24 +128,25 @@ public class SegmentedPage extends SchemaPage implements
ISegmentedPage {
}
@Override
- public void update(short segIdx, String key, ByteBuffer buffer) throws
MetadataException {
+ public long update(short segIdx, String key, ByteBuffer buffer) throws
MetadataException {
ISegment<ByteBuffer, ICachedMNode> seg = getSegment(segIdx);
- try {
- if (seg.updateRecord(key, buffer) < 0) {
- throw new MetadataException("Record to update not found.");
- }
- } catch (SegmentOverflowException e) {
- // relocate large enough to include buffer, throw up if page overflow
+
+ if (seg.updateRecord(key, buffer) < 0) {
+ short spare = spareSize;
seg =
relocateSegment(
seg, segIdx, SchemaFile.reEstimateSegSize(seg.size() +
buffer.capacity()));
- // retry and throw if failed again
+ if (seg == null) {
+ return -1;
+ }
+
if (seg.updateRecord(key, buffer) < 0) {
- throw new MetadataException(
- String.format("Unknown reason for key [%s] not found in page
[%d].", key, pageIndex));
+ throw new MetadataException("failed to update buffer upon relocated
segment");
}
+ return spare >= spareSize ? 0 : spareSize - spare;
}
+ return 0L;
}
@Override
@@ -401,13 +405,12 @@ public class SegmentedPage extends SchemaPage implements
ISegmentedPage {
* @param seg original segment instance
* @param segIdx original segment index
* @param newSize target segment size
- * @return reallocated segment instance
- * @throws SchemaPageOverflowException if this page has no enough space
+ * @return reallocated segment instance, null if no enough spare space
*/
private ISegment<ByteBuffer, ICachedMNode> relocateSegment(
ISegment<?, ?> seg, short segIdx, short newSize) throws
MetadataException {
if (seg.size() == SchemaFileConfig.SEG_MAX_SIZ || getSpareSize() +
seg.size() < newSize) {
- throw new SchemaPageOverflowException(pageIndex);
+ return null;
}
// try to allocate space directly from spareOffset or rearrange and extend
in place
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/WrappedSegment.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/WrappedSegment.java
index ce1065234d4..cc43f619707 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/WrappedSegment.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/WrappedSegment.java
@@ -23,7 +23,6 @@ import
org.apache.iotdb.commons.schema.view.viewExpression.ViewExpression;
import org.apache.iotdb.commons.utils.TestOnly;
import
org.apache.iotdb.db.exception.metadata.schemafile.ColossalRecordException;
import
org.apache.iotdb.db.exception.metadata.schemafile.RecordDuplicatedException;
-import
org.apache.iotdb.db.exception.metadata.schemafile.SegmentOverflowException;
import
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.mnode.ICachedMNode;
import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
@@ -558,12 +557,11 @@ public class WrappedSegment implements
ISegment<ByteBuffer, ICachedMNode> {
}
@Override
- public int updateRecord(String key, ByteBuffer uBuffer)
- throws SegmentOverflowException, RecordDuplicatedException {
+ public int updateRecord(String key, ByteBuffer uBuffer) throws
MetadataException {
int idx = binarySearchOnKeys(key);
if (idx < 0) {
- return -1;
+ throw new MetadataException(String.format("Record[key:%s] Not Existed.",
key));
}
this.buffer.clear();
@@ -580,8 +578,8 @@ public class WrappedSegment implements ISegment<ByteBuffer,
ICachedMNode> {
// allocate new space for record, update offset array, freeAddr
if (SchemaFileConfig.SEG_HEADER_SIZE + pairLength + newLen + 4 +
key.getBytes().length
> freeAddr) {
- // not enough space
- throw new SegmentOverflowException(idx);
+ // no enough consecutive spare space
+ return -1;
}
freeAddr = (short) (freeAddr - newLen - 4 - key.getBytes().length);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/pagemgr/PageManager.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/pagemgr/PageManager.java
index ad92a332d57..95a47533ca8 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/pagemgr/PageManager.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/pagemgr/PageManager.java
@@ -172,6 +172,7 @@ public abstract class PageManager implements IPageManager {
int subIndex;
long curSegAddr = getNodeAddress(node);
long actualAddress; // actual segment to write record
+ long res; // result of write
ICachedMNode child;
ISchemaPage curPage;
ByteBuffer childBuffer;
@@ -207,10 +208,16 @@ public abstract class PageManager implements IPageManager
{
actualAddress = getTargetSegmentAddress(curSegAddr, entry.getKey(), cxt);
curPage = getPageInstance(SchemaFile.getPageIndex(actualAddress), cxt);
- try {
- curPage
- .getAsSegmentedPage()
- .write(SchemaFile.getSegIndex(actualAddress), entry.getKey(),
childBuffer);
+ res =
+ curPage
+ .getAsSegmentedPage()
+ .write(SchemaFile.getSegIndex(actualAddress), entry.getKey(),
childBuffer);
+ if (res >= 0) {
+ if (res != 0) {
+ // spare size increased, buckets need to update
+ cxt.indexBuckets.sortIntoBucket(curPage, (short) -1);
+ }
+
interleavedFlush(curPage, cxt);
cxt.markDirty(curPage);
@@ -218,8 +225,9 @@ public abstract class PageManager implements IPageManager {
if (alias != null && subIndex >= 0) {
insertSubIndexEntry(subIndex, alias, entry.getKey(), cxt);
}
-
- } catch (SchemaPageOverflowException e) {
+ } else {
+ // page overflow
+ // current page is not enough for coming record
if
(curPage.getAsSegmentedPage().getSegmentSize(SchemaFile.getSegIndex(actualAddress))
== SchemaFileConfig.SEG_MAX_SIZ) {
// curPage might be replaced so unnecessary to mark it here
@@ -273,6 +281,7 @@ public abstract class PageManager implements IPageManager {
ICachedMNode child, oldChild;
ISchemaPage curPage;
ByteBuffer childBuffer;
+ long res; // result of update
for (Map.Entry<String, ICachedMNode> entry :
ICachedMNodeContainer.getCachedMNodeContainer(node).getUpdatedChildBuffer().entrySet())
{
child = entry.getValue();
@@ -319,10 +328,15 @@ public abstract class PageManager implements IPageManager
{
insertNewSubEntry = removeOldSubEntry = false;
}
- try {
- curPage
- .getAsSegmentedPage()
- .update(SchemaFile.getSegIndex(actualAddress), entry.getKey(),
childBuffer);
+ res =
+ curPage
+ .getAsSegmentedPage()
+ .update(SchemaFile.getSegIndex(actualAddress), entry.getKey(),
childBuffer);
+ if (res >= 0) {
+ if (res != 0) {
+ // spare size increased, buckets need to update
+ cxt.indexBuckets.sortIntoBucket(curPage, (short) -1);
+ }
interleavedFlush(curPage, cxt);
cxt.markDirty(curPage);
@@ -336,7 +350,8 @@ public abstract class PageManager implements IPageManager {
insertSubIndexEntry(subIndex, alias, entry.getKey(), cxt);
}
}
- } catch (SchemaPageOverflowException e) {
+ } else {
+ // page overflow
if
(curPage.getAsSegmentedPage().getSegmentSize(SchemaFile.getSegIndex(actualAddress))
== SchemaFileConfig.SEG_MAX_SIZ) {
multiPageUpdateOverflowOperation(curPage, entry.getKey(),
childBuffer, cxt);
@@ -544,7 +559,13 @@ public abstract class PageManager implements IPageManager {
private long preAllocateSegment(short size, SchemaPageContext cxt)
throws IOException, MetadataException {
ISegmentedPage page = getMinApplSegmentedPageInMem(size, cxt);
- return SchemaFile.getGlobalIndex(page.getPageIndex(),
page.allocNewSegment(size));
+ short sparePrev = page.getSpareSize();
+ long res = SchemaFile.getGlobalIndex(page.getPageIndex(),
page.allocNewSegment(size));
+ if (sparePrev < page.getSpareSize()) {
+ // a compaction trigger by allocNewSegment had increased spare size
+ cxt.indexBuckets.sortIntoBucket(page, (short) -1);
+ }
+ return res;
}
protected ISchemaPage replacePageInCache(ISchemaPage page, SchemaPageContext
cxt) {