This is an automated email from the ASF dual-hosted git repository.
chenyz pushed a commit to branch pbtree_concurrent
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/pbtree_concurrent by this push:
new eb1aa3f1322 Introduce Node-Level lock to PBTree cache management
(#11633)
eb1aa3f1322 is described below
commit eb1aa3f1322c6487b821e449c54486c7ea58c1db
Author: Marcos_Zyk <[email protected]>
AuthorDate: Wed Dec 13 10:40:56 2023 +0800
Introduce Node-Level lock to PBTree cache management (#11633)
Introduce Node-Level lock to PBTree cache management
---
.../mtree/impl/mem/MTreeBelowSGMemoryImpl.java | 20 +-
.../mtree/impl/pbtree/CachedMTreeStore.java | 157 +++++--
.../mtree/impl/pbtree/MTreeBelowSGCachedImpl.java | 62 +--
.../pbtree/ReentrantReadOnlyCachedMTreeStore.java | 4 +-
.../mtree/impl/pbtree/cache/CacheEntry.java | 22 +-
.../mtree/impl/pbtree/cache/CacheManager.java | 502 ++++++++-------------
.../impl/pbtree/cache/CacheMemoryManager.java | 22 +-
.../mtree/impl/pbtree/cache/ICacheManager.java | 8 +
.../CacheMNodeInfo.java => cache/INodeBuffer.java} | 41 +-
.../mtree/impl/pbtree/cache/LRUCacheManager.java | 45 +-
.../mtree/impl/pbtree/cache/NodeBuffer.java | 216 +++++++++
.../mtree/impl/pbtree/cache/PlainCacheManager.java | 17 +-
.../impl/pbtree/flush/PBTreeFlushExecutor.java | 41 +-
.../{cache/CacheEntry.java => lock/LockEntry.java} | 29 +-
.../mtree/impl/pbtree/lock/LockManager.java | 147 ++++++
.../{ => lock}/StampedWriterPreferredLock.java | 26 +-
.../mtree/impl/pbtree/mnode/ICachedMNode.java | 5 +
.../impl/pbtree/mnode/basic/CachedBasicMNode.java | 11 +
.../mnode/container/CachedMNodeContainer.java | 8 +-
.../mnode/impl/CachedAboveDatabaseMNode.java | 11 +
.../pbtree/mnode/impl/CachedDatabaseMNode.java | 11 +
.../pbtree/mnode/impl/CachedMeasurementMNode.java | 11 +
.../impl/pbtree/mnode/info/CacheMNodeInfo.java | 13 +-
.../impl/pbtree/schemafile/MockSchemaFile.java | 20 +-
.../mtree/lock/StampedWriterPreferredLockTest.java | 16 +-
.../schemaRegion/SchemaStatisticsTest.java | 5 +-
26 files changed, 940 insertions(+), 530 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/mem/MTreeBelowSGMemoryImpl.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/mem/MTreeBelowSGMemoryImpl.java
index ffb8cf41052..6438f91f716 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/mem/MTreeBelowSGMemoryImpl.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/mem/MTreeBelowSGMemoryImpl.java
@@ -503,9 +503,11 @@ public class MTreeBelowSGMemoryImpl {
IMeasurementMNode<IMemMNode> deletedNode = getMeasurementMNode(path);
IMemMNode parent = deletedNode.getParent();
// delete the last node of path
- store.deleteChild(parent, path.getMeasurement());
- if (deletedNode.getAlias() != null) {
- parent.getAsDeviceMNode().deleteAliasChild(deletedNode.getAlias());
+ synchronized (this) {
+ store.deleteChild(parent, path.getMeasurement());
+ if (deletedNode.getAlias() != null) {
+ parent.getAsDeviceMNode().deleteAliasChild(deletedNode.getAlias());
+ }
}
deleteEmptyInternalMNode(parent.getAsDeviceMNode());
return deletedNode;
@@ -541,13 +543,19 @@ public class MTreeBelowSGMemoryImpl {
}
// delete all empty ancestors except database and MeasurementMNode
- while (isEmptyInternalMNode(curNode)) {
+ while (true) {
// if current database has no time series, return the database name
if (curNode.isDatabase()) {
return;
}
- store.deleteChild(curNode.getParent(), curNode.getName());
- curNode = curNode.getParent();
+
+ synchronized (this) {
+ if (!isEmptyInternalMNode(curNode)) {
+ break;
+ }
+ store.deleteChild(curNode.getParent(), curNode.getName());
+ curNode = curNode.getParent();
+ }
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/CachedMTreeStore.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/CachedMTreeStore.java
index 0f6e921e16e..d6ec24cb118 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/CachedMTreeStore.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/CachedMTreeStore.java
@@ -34,11 +34,13 @@ import
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.mem.mnode.iterat
import
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.cache.CacheMemoryManager;
import
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.cache.ICacheManager;
import
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.flush.PBTreeFlushExecutor;
+import
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.lock.LockManager;
import
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.memcontrol.MemManager;
import
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.mnode.ICachedMNode;
import
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.mnode.container.ICachedMNodeContainer;
import
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.mnode.iterator.CachedTraverserIterator;
import
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.schemafile.ISchemaFile;
+import
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.schemafile.MockSchemaFile;
import
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.schemafile.SchemaFile;
import
org.apache.iotdb.db.schemaengine.schemaregion.mtree.loader.MNodeFactoryLoader;
import org.apache.iotdb.db.schemaengine.schemaregion.utils.MNodeUtils;
@@ -69,7 +71,7 @@ public class CachedMTreeStore implements
IMTreeStore<ICachedMNode> {
private final IMNodeFactory<ICachedMNode> nodeFactory =
MNodeFactoryLoader.getInstance().getCachedMNodeIMNodeFactory();
private final CachedSchemaRegionStatistics regionStatistics;
- private final StampedWriterPreferredLock lock = new
StampedWriterPreferredLock();
+ private final LockManager lockManager = new LockManager();
public CachedMTreeStore(
PartialPath storageGroup,
@@ -78,12 +80,14 @@ public class CachedMTreeStore implements
IMTreeStore<ICachedMNode> {
Runnable flushCallback)
throws MetadataException, IOException {
this.schemaRegionId = schemaRegionId;
- file = SchemaFile.initSchemaFile(storageGroup.getFullPath(),
schemaRegionId);
+ // file = SchemaFile.initSchemaFile(storageGroup.getFullPath(),
schemaRegionId);
+ file = new MockSchemaFile(storageGroup);
root = file.init();
this.regionStatistics = regionStatistics;
this.memManager = new MemManager(regionStatistics);
this.flushCallback = flushCallback;
- this.cacheManager =
CacheMemoryManager.getInstance().createLRUCacheManager(this, memManager);
+ this.cacheManager =
+ CacheMemoryManager.getInstance().createLRUCacheManager(this,
memManager, lockManager);
cacheManager.initRootStatus(root);
regionStatistics.setCacheManager(cacheManager);
ensureMemoryStatus();
@@ -113,25 +117,32 @@ public class CachedMTreeStore implements
IMTreeStore<ICachedMNode> {
@Override
public boolean hasChild(ICachedMNode parent, String name) throws
MetadataException {
- return hasChild(parent, name, true);
+ return hasChild(parent, name, true, true);
}
- protected final boolean hasChild(ICachedMNode parent, String name, boolean
needLock)
+ protected final boolean hasChild(
+ ICachedMNode parent, String name, boolean needGlobalLock, boolean
needNodeLock)
throws MetadataException {
- if (needLock) {
- lock.threadReadLock();
+ if (needGlobalLock) {
+ lockManager.globalReadLock();
+ }
+ if (needNodeLock) {
+ lockManager.threadReadLock(parent);
}
try {
- ICachedMNode child = getChild(parent, name, needLock);
+ ICachedMNode child = getChild(parent, name, false, false);
if (child == null) {
return false;
} else {
- unPin(child);
+ unPin(child, false);
return true;
}
} finally {
- if (needLock) {
- lock.threadReadUnlock();
+ if (needNodeLock) {
+ lockManager.threadReadUnlock(parent);
+ }
+ if (needGlobalLock) {
+ lockManager.globalReadUnlock();
}
}
}
@@ -150,13 +161,17 @@ public class CachedMTreeStore implements
IMTreeStore<ICachedMNode> {
*/
@Override
public ICachedMNode getChild(ICachedMNode parent, String name) throws
MetadataException {
- return getChild(parent, name, true);
+ return getChild(parent, name, true, true);
}
- protected final ICachedMNode getChild(ICachedMNode parent, String name,
boolean needLock)
+ protected final ICachedMNode getChild(
+ ICachedMNode parent, String name, boolean needGlobalLock, boolean
needNodeLock)
throws MetadataException {
- if (needLock) {
- lock.threadReadLock();
+ if (needGlobalLock) {
+ lockManager.globalReadLock();
+ }
+ if (needNodeLock) {
+ lockManager.threadReadLock(parent);
}
try {
ICachedMNode node = parent.getChild(name);
@@ -175,8 +190,11 @@ public class CachedMTreeStore implements
IMTreeStore<ICachedMNode> {
return node;
} finally {
- if (needLock) {
- lock.threadReadUnlock();
+ if (needNodeLock) {
+ lockManager.threadReadUnlock(parent);
+ }
+ if (needGlobalLock) {
+ lockManager.globalReadUnlock();
}
}
}
@@ -266,14 +284,16 @@ public class CachedMTreeStore implements
IMTreeStore<ICachedMNode> {
// must pin parent first
@Override
public ICachedMNode addChild(ICachedMNode parent, String childName,
ICachedMNode child) {
- lock.threadReadLock();
+ lockManager.globalReadLock();
+ lockManager.threadReadLock(parent);
try {
child.setParent(parent);
cacheManager.updateCacheStatusAfterAppend(child);
ensureMemoryStatus();
return parent.getChild(childName);
} finally {
- lock.threadReadUnlock();
+ lockManager.threadReadUnlock(parent);
+ lockManager.globalReadUnlock();
}
}
@@ -290,9 +310,9 @@ public class CachedMTreeStore implements
IMTreeStore<ICachedMNode> {
*/
@Override
public void deleteChild(ICachedMNode parent, String childName) throws
MetadataException {
- lock.writeLock();
+ lockManager.globalWriteLock();
try {
- ICachedMNode deletedMNode = getChild(parent, childName, false);
+ ICachedMNode deletedMNode = getChild(parent, childName, false, false);
ICachedMNodeContainer container =
ICachedMNodeContainer.getCachedMNodeContainer(parent);
if (!container.isVolatile() &&
!container.hasChildInNewChildBuffer(childName)) {
// the container has been persisted and this child is not a new child,
which means the child
@@ -307,7 +327,7 @@ public class CachedMTreeStore implements
IMTreeStore<ICachedMNode> {
parent.deleteChild(childName);
cacheManager.remove(deletedMNode);
} finally {
- lock.unlockWrite();
+ lockManager.globalWriteUnlock();
}
}
@@ -324,14 +344,20 @@ public class CachedMTreeStore implements
IMTreeStore<ICachedMNode> {
final void updateMNode(ICachedMNode node, Consumer<ICachedMNode> operation,
boolean needLock) {
if (needLock) {
- lock.threadReadLock();
+ lockManager.globalReadLock();
+ }
+ if (!node.isDatabase()) {
+ lockManager.threadReadLock(node.getParent(), true);
}
try {
operation.accept(node);
cacheManager.updateCacheStatusAfterUpdate(node);
} finally {
+ if (!node.isDatabase()) {
+ lockManager.threadReadUnlock(node.getParent());
+ }
if (needLock) {
- lock.threadReadUnlock();
+ lockManager.globalReadUnlock();
}
}
}
@@ -405,14 +431,24 @@ public class CachedMTreeStore implements
IMTreeStore<ICachedMNode> {
}
final void pin(ICachedMNode node, boolean needLock) throws MetadataException
{
+ if (node.getParent() == null) {
+ // ignore node represented by template
+ return;
+ }
if (needLock) {
- lock.threadReadLock();
+ lockManager.globalReadLock();
+ }
+ if (!node.isDatabase()) {
+ lockManager.threadReadLock(node.getParent());
}
try {
cacheManager.pinMNode(node);
} finally {
+ if (!node.isDatabase()) {
+ lockManager.threadReadUnlock(node.getParent());
+ }
if (needLock) {
- lock.threadReadUnlock();
+ lockManager.globalReadUnlock();
}
}
}
@@ -431,16 +467,26 @@ public class CachedMTreeStore implements
IMTreeStore<ICachedMNode> {
}
final void unPin(ICachedMNode node, boolean needLock) {
+ if (node.getParent() == null) {
+ // ignore node represented by template
+ return;
+ }
if (needLock) {
- lock.threadReadLock();
+ lockManager.globalReadLock();
+ }
+ if (!node.isDatabase()) {
+ lockManager.threadReadLock(node.getParent(), true);
}
try {
if (cacheManager.unPinMNode(node)) {
ensureMemoryStatus();
}
} finally {
+ if (!node.isDatabase()) {
+ lockManager.threadReadUnlock(node.getParent());
+ }
if (needLock) {
- lock.threadReadUnlock();
+ lockManager.globalReadUnlock();
}
}
}
@@ -458,11 +504,11 @@ public class CachedMTreeStore implements
IMTreeStore<ICachedMNode> {
}
final long stampedReadLock() {
- return lock.stampedReadLock();
+ return lockManager.globalStampedReadLock();
}
final void stampedReadUnlock(long stamp) {
- lock.stampedReadUnlock(stamp);
+ lockManager.globalStampedReadUnlock(stamp);
}
@Override
@@ -473,7 +519,7 @@ public class CachedMTreeStore implements
IMTreeStore<ICachedMNode> {
/** clear all the data of MTreeStore in memory and disk. */
@Override
public void clear() {
- lock.writeLock();
+ lockManager.globalWriteLock();
try {
CacheMemoryManager.getInstance().clearCachedMTreeStore(this);
regionStatistics.setCacheManager(null);
@@ -489,19 +535,18 @@ public class CachedMTreeStore implements
IMTreeStore<ICachedMNode> {
}
file = null;
} finally {
- lock.unlockWrite();
+ lockManager.globalWriteUnlock();
}
}
@Override
public boolean createSnapshot(File snapshotDir) {
- lock.writeLock();
+ lockManager.globalWriteLock();
try {
- flushVolatileNodes();
- ensureMemoryStatus();
+ flushVolatileNodes(false);
return file.createSnapshot(snapshotDir);
} finally {
- lock.unlockWrite();
+ lockManager.globalWriteUnlock();
}
}
@@ -529,7 +574,8 @@ public class CachedMTreeStore implements
IMTreeStore<ICachedMNode> {
this.regionStatistics = regionStatistics;
this.memManager = new MemManager(regionStatistics);
this.flushCallback = flushCallback;
- this.cacheManager =
CacheMemoryManager.getInstance().createLRUCacheManager(this, memManager);
+ this.cacheManager =
+ CacheMemoryManager.getInstance().createLRUCacheManager(this,
memManager, lockManager);
cacheManager.initRootStatus(root);
regionStatistics.setCacheManager(cacheManager);
ensureMemoryStatus();
@@ -539,10 +585,6 @@ public class CachedMTreeStore implements
IMTreeStore<ICachedMNode> {
CacheMemoryManager.getInstance().ensureMemoryStatus();
}
- public StampedWriterPreferredLock getLock() {
- return lock;
- }
-
public CachedSchemaRegionStatistics getRegionStatistics() {
return regionStatistics;
}
@@ -561,7 +603,10 @@ public class CachedMTreeStore implements
IMTreeStore<ICachedMNode> {
}
/** Sync all volatile nodes to PBTree and execute memory release after
flush. */
- public void flushVolatileNodes() {
+ public void flushVolatileNodes(boolean needLock) {
+ if (needLock) {
+ lockManager.globalReadLock();
+ }
try {
boolean hasVolatileNodes = flushVolatileDBNode();
@@ -575,7 +620,8 @@ public class CachedMTreeStore implements
IMTreeStore<ICachedMNode> {
PBTreeFlushExecutor flushExecutor;
while (volatileSubtrees.hasNext()) {
subtreeRoot = volatileSubtrees.next();
- flushExecutor = new PBTreeFlushExecutor(subtreeRoot, cacheManager,
file);
+ flushExecutor =
+ new PBTreeFlushExecutor(subtreeRoot, needLock, cacheManager,
file, lockManager);
flushExecutor.flushVolatileNodes();
}
@@ -590,6 +636,8 @@ public class CachedMTreeStore implements
IMTreeStore<ICachedMNode> {
if (hasVolatileNodes) {
flushCallback.run();
}
+
+ ensureMemoryStatus();
} catch (MetadataException | IOException e) {
LOGGER.warn(
"Exception occurred during MTree flush, current SchemaRegionId is
{}", schemaRegionId, e);
@@ -597,6 +645,10 @@ public class CachedMTreeStore implements
IMTreeStore<ICachedMNode> {
LOGGER.error(
"Error occurred during MTree flush, current SchemaRegionId is {}",
schemaRegionId, e);
e.printStackTrace();
+ } finally {
+ if (needLock) {
+ lockManager.globalReadUnlock();
+ }
}
}
@@ -629,13 +681,19 @@ public class CachedMTreeStore implements
IMTreeStore<ICachedMNode> {
Iterator<ICachedMNode> bufferIterator;
boolean isIteratingDisk;
ICachedMNode nextNode;
+
+ boolean needLock;
boolean isLocked;
+ long readLockStamp;
+
CachedMNodeIterator(ICachedMNode parent, boolean needLock)
throws MetadataException, IOException {
+ this.needLock = needLock;
if (needLock) {
- lock.threadReadLock();
+ lockManager.globalReadLock();
}
+ readLockStamp = lockManager.stampedReadLock(parent);
isLocked = true;
try {
this.parent = parent;
@@ -650,8 +708,9 @@ public class CachedMTreeStore implements
IMTreeStore<ICachedMNode> {
}
} catch (Throwable e) {
+ lockManager.stampedReadUnlock(parent, readLockStamp);
if (needLock) {
- lock.threadReadUnlock();
+ lockManager.globalReadUnlock();
}
isLocked = false;
throw e;
@@ -742,12 +801,16 @@ public class CachedMTreeStore implements
IMTreeStore<ICachedMNode> {
public void close() {
try {
if (nextNode != null) {
- unPin(nextNode);
+ unPin(nextNode, false);
nextNode = null;
}
} finally {
if (isLocked) {
- lock.threadReadUnlock();
+ lockManager.stampedReadUnlock(parent, readLockStamp);
+ if (needLock) {
+ lockManager.globalReadUnlock();
+ }
+ isLocked = false;
}
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/MTreeBelowSGCachedImpl.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/MTreeBelowSGCachedImpl.java
index 7b695f5e0e6..106638876cc 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/MTreeBelowSGCachedImpl.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/MTreeBelowSGCachedImpl.java
@@ -587,9 +587,11 @@ public class MTreeBelowSGCachedImpl {
IMeasurementMNode<ICachedMNode> deletedNode = getMeasurementMNode(path);
ICachedMNode parent = deletedNode.getParent();
// delete the last node of path
- store.deleteChild(parent, path.getMeasurement());
- if (deletedNode.getAlias() != null) {
- parent.getAsDeviceMNode().deleteAliasChild(deletedNode.getAlias());
+ synchronized (this) {
+ store.deleteChild(parent, path.getMeasurement());
+ if (deletedNode.getAlias() != null) {
+ parent.getAsDeviceMNode().deleteAliasChild(deletedNode.getAlias());
+ }
}
deleteAndUnpinEmptyInternalMNode(parent.getAsDeviceMNode());
return deletedNode;
@@ -605,44 +607,50 @@ public class MTreeBelowSGCachedImpl {
throws MetadataException {
ICachedMNode curNode = entityMNode.getAsMNode();
if (!entityMNode.isUseTemplate()) {
- boolean hasMeasurement = false;
- boolean hasNonViewMeasurement = false;
- ICachedMNode child;
- IMNodeIterator<ICachedMNode> iterator =
store.getChildrenIterator(curNode);
- try {
- while (iterator.hasNext()) {
- child = iterator.next();
- unPinMNode(child);
- if (child.isMeasurement()) {
- hasMeasurement = true;
- if (!child.getAsMeasurementMNode().isLogicalView()) {
- hasNonViewMeasurement = true;
- break;
+ synchronized (this) {
+ boolean hasMeasurement = false;
+ boolean hasNonViewMeasurement = false;
+ ICachedMNode child;
+ IMNodeIterator<ICachedMNode> iterator =
store.getChildrenIterator(curNode);
+ try {
+ while (iterator.hasNext()) {
+ child = iterator.next();
+ unPinMNode(child);
+ if (child.isMeasurement()) {
+ hasMeasurement = true;
+ if (!child.getAsMeasurementMNode().isLogicalView()) {
+ hasNonViewMeasurement = true;
+ break;
+ }
}
}
+ } finally {
+ iterator.close();
}
- } finally {
- iterator.close();
- }
- if (!hasMeasurement) {
- synchronized (this) {
+ if (!hasMeasurement) {
curNode = store.setToInternal(entityMNode);
+ } else if (!hasNonViewMeasurement) {
+ // has some measurement but they are all logical view
+ store.updateMNode(entityMNode.getAsMNode(), o ->
o.getAsDeviceMNode().setAligned(null));
}
- } else if (!hasNonViewMeasurement) {
- // has some measurement but they are all logical view
- store.updateMNode(entityMNode.getAsMNode(), o ->
o.getAsDeviceMNode().setAligned(null));
}
}
// delete all empty ancestors except database and MeasurementMNode
- while (isEmptyInternalMNode(curNode)) {
+ while (true) {
// if current database has no time series, return the database name
if (curNode.isDatabase()) {
return;
}
- store.deleteChild(curNode.getParent(), curNode.getName());
- curNode = curNode.getParent();
+
+ synchronized (this) {
+ if (!isEmptyInternalMNode(curNode)) {
+ break;
+ }
+ store.deleteChild(curNode.getParent(), curNode.getName());
+ curNode = curNode.getParent();
+ }
}
unPinMNode(curNode);
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/ReentrantReadOnlyCachedMTreeStore.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/ReentrantReadOnlyCachedMTreeStore.java
index 6daa9ff0686..969bcdcfce4 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/ReentrantReadOnlyCachedMTreeStore.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/ReentrantReadOnlyCachedMTreeStore.java
@@ -52,12 +52,12 @@ public class ReentrantReadOnlyCachedMTreeStore implements
IMTreeStore<ICachedMNo
@Override
public boolean hasChild(ICachedMNode parent, String name) throws
MetadataException {
- return store.hasChild(parent, name, false);
+ return store.hasChild(parent, name, false, true);
}
@Override
public ICachedMNode getChild(ICachedMNode parent, String name) throws
MetadataException {
- return store.getChild(parent, name, false);
+ return store.getChild(parent, name, false, true);
}
@Override
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/cache/CacheEntry.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/cache/CacheEntry.java
index 1a686132571..f16e20972d4 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/cache/CacheEntry.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/cache/CacheEntry.java
@@ -24,7 +24,9 @@ public class CacheEntry {
private volatile boolean isVolatile = false;
- private final AtomicInteger semaphore = new AtomicInteger();
+ private final AtomicInteger pinSemaphore = new AtomicInteger(0);
+
+ private final AtomicInteger volatileDescendantSemaphore = new
AtomicInteger(0);
public boolean isVolatile() {
return isVolatile;
@@ -35,14 +37,26 @@ public class CacheEntry {
}
public void pin() {
- semaphore.getAndIncrement();
+ pinSemaphore.getAndIncrement();
}
public void unPin() {
- semaphore.getAndDecrement();
+ pinSemaphore.getAndDecrement();
}
public boolean isPinned() {
- return semaphore.get() > 0;
+ return pinSemaphore.get() > 0;
+ }
+
+ public void incVolatileDescendant() {
+ volatileDescendantSemaphore.getAndIncrement();
+ }
+
+ public void decVolatileDescendant() {
+ volatileDescendantSemaphore.getAndDecrement();
+ }
+
+ public boolean hasVolatileDescendant() {
+ return volatileDescendantSemaphore.get() > 0;
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/cache/CacheManager.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/cache/CacheManager.java
index b8c055fd4c3..8724f8d4413 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/cache/CacheManager.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/cache/CacheManager.java
@@ -21,6 +21,7 @@ package
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.cache;
import org.apache.iotdb.commons.schema.node.role.IDatabaseMNode;
import org.apache.iotdb.db.exception.metadata.cache.MNodeNotCachedException;
import org.apache.iotdb.db.exception.metadata.cache.MNodeNotPinnedException;
+import
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.lock.LockManager;
import
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.memcontrol.MemManager;
import
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.mnode.ICachedMNode;
import
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.mnode.container.ICachedMNodeContainer;
@@ -28,9 +29,7 @@ import
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.mnode.con
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
-import java.util.Map;
import java.util.NoSuchElementException;
-import java.util.concurrent.ConcurrentHashMap;
import static
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.mnode.container.ICachedMNodeContainer.getBelongedContainer;
import static
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.mnode.container.ICachedMNodeContainer.getCachedMNodeContainer;
@@ -64,13 +63,16 @@ import static
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.mn
*/
public abstract class CacheManager implements ICacheManager {
+ private final LockManager lockManager;
+
private final MemManager memManager;
// The nodeBuffer helps to quickly locate the volatile subtree
- private final NodeBuffer nodeBuffer = new NodeBuffer();
+ private final INodeBuffer nodeBuffer = new NodeBuffer();
- protected CacheManager(MemManager memManager) {
+ protected CacheManager(MemManager memManager, LockManager lockManager) {
this.memManager = memManager;
+ this.lockManager = lockManager;
}
public void initRootStatus(ICachedMNode root) {
@@ -125,121 +127,95 @@ public abstract class CacheManager implements
ICacheManager {
pinMNodeWithMemStatusUpdate(node);
CacheEntry cacheEntry = getCacheEntry(node);
cacheEntry.setVolatile(true);
+ // the ancestors must be processed first since the volatileDescendant
judgement is of higher
+ // priority than
+ // children container judgement
+ removeAncestorsFromCache(node);
getBelongedContainer(node).appendMNode(node);
- addToBufferAfterAppend(node);
+ nodeBuffer.addNewNodeToBuffer(node);
}
/**
- * look for the first none volatile ancestor of this node and add it to
nodeBuffer. Through this
- * the volatile subtree the given node belong to will be record in
nodeBuffer.
+ * The updated node should be marked volatile and removed from nodeCache if
necessary and the
+ * volatile subtree it belonged should be added to nodeBuffer.
*
* @param node
*/
- private void addToBufferAfterAppend(ICachedMNode node) {
- removeAncestorsFromCache(node);
- ICachedMNode parent = node.getParent();
- CacheEntry cacheEntry = getCacheEntry(parent);
- if (!cacheEntry.isVolatile()) {
- // the cacheEntry may be set to volatile concurrently, the unVolatile
node should not be added
- // to nodeBuffer, which prevent the duplicated collecting on subTree
- synchronized (cacheEntry) {
- if (!cacheEntry.isVolatile()) {
- nodeBuffer.put(cacheEntry, parent);
- }
+ @Override
+ public void updateCacheStatusAfterUpdate(ICachedMNode node) {
+ if (node.isDatabase()) {
+
nodeBuffer.updateDatabaseNodeAfterStatusUpdate(node.getAsDatabaseMNode());
+ return;
+ }
+
+ CacheEntry cacheEntry = getCacheEntry(node);
+ synchronized (cacheEntry) {
+ if (cacheEntry.isVolatile()) {
+ return;
}
+ // the status change affects the subTre collect in nodeBuffer
+ cacheEntry.setVolatile(true);
+ if (!cacheEntry.hasVolatileDescendant()) {
+ removeFromNodeCache(cacheEntry);
+ removeAncestorsFromCache(node);
+ }
+ getBelongedContainer(node).updateMNode(node.getName());
+ nodeBuffer.addUpdatedNodeToBuffer(node);
}
}
/**
- * The ancestors of volatile node should not stay in nodeCache in which the
node will be evicted.
- * When invoking this method, all the ancestors have been pinned.
+ * This method should be invoked after a node in memory be updated to
volatile status, which could
+ * be caused by addChild or updateNode in MTree. The ancestors of volatile
node should not stay in
+ * nodeCache in which the node will be evicted. When invoking this method,
all the ancestors have
+ * been pinned.
*/
private void removeAncestorsFromCache(ICachedMNode node) {
- ICachedMNode parent = node.getParent();
- ICachedMNode current = node;
- CacheEntry cacheEntry = getCacheEntry(parent);
- while (!current.isDatabase() && isInNodeCache(cacheEntry)) {
- removeFromNodeCache(cacheEntry);
- current = parent;
- parent = parent.getParent();
- cacheEntry = getCacheEntry(parent);
- }
- }
+ ICachedMNode current = node.getParent();
+ CacheEntry cacheEntry = getCacheEntry(current);
+ boolean isStatusChange;
- /**
- * The updated node should be marked volatile and removed from nodeCache if
necessary and the
- * volatile subtree it belonged should be added to nodeBuffer.
- *
- * @param node
- */
- @Override
- public void updateCacheStatusAfterUpdate(ICachedMNode node) {
- CacheEntry cacheEntry = getCacheEntry(node);
- if (!cacheEntry.isVolatile()) {
- if (!node.isDatabase()) {
- synchronized (cacheEntry) {
- // the status change affects the subTre collect in nodeBuffer
- cacheEntry.setVolatile(true);
+ while (!current.isDatabase()) {
+ isStatusChange = false;
+ synchronized (cacheEntry) {
+ if (!cacheEntry.hasVolatileDescendant()) {
+ cacheEntry.incVolatileDescendant();
+ isStatusChange = true;
+ removeFromNodeCache(cacheEntry);
+ } else {
+ cacheEntry.incVolatileDescendant();
+ }
+
+ if (!isStatusChange || cacheEntry.isVolatile()) {
+ return;
}
- // if node is StorageGroup, getBelongedContainer is null
- getBelongedContainer(node).updateMNode(node.getName());
- // MNode update operation like node replace may reset the mapping
between cacheEntry and
- // node,
- // thus it should be updated
- updateCacheStatusAfterUpdate(cacheEntry, node);
- removeFromNodeCache(cacheEntry);
}
- addToBufferAfterUpdate(node);
+
+ current = current.getParent();
+ cacheEntry = getCacheEntry(current);
}
}
/**
- * look for the first none volatile ancestor of this node and add it to
nodeBuffer. Through this
- * the volatile subtree the given node belong to will be record in
nodeBuffer.
- *
- * @param node
+ * This method should be invoked after a node in memory be updated to
non-volatile status, which
+ * could be caused by flushChildren or deleteChild in MTree.
*/
- private void addToBufferAfterUpdate(ICachedMNode node) {
- if (node.isDatabase()) {
- nodeBuffer.setUpdatedStorageGroupMNode(node.getAsDatabaseMNode());
- return;
- }
+ private void addAncestorsToCache(ICachedMNode node) {
+ ICachedMNode current = node.getParent();
+ CacheEntry cacheEntry = getCacheEntry(current);
- removeAncestorsFromCache(node);
- ICachedMNode parent = node.getParent();
- CacheEntry cacheEntry = getCacheEntry(parent);
-
- /*
- * The updated node may already exist in nodeBuffer, remove it and ensure
- * the volatile subtree it belongs to is in nodeBuffer
- */
- if (!cacheEntry.isVolatile()) {
- // make sure that the nodeBuffer contains all the root node of volatile
subTree
- // give that root.sg.d.s, if sg and d have been persisted and s are
volatile, then d
- // will be added to nodeBuffer
+ while (!current.isDatabase()) {
synchronized (cacheEntry) {
- if (!cacheEntry.isVolatile()) {
- nodeBuffer.put(cacheEntry, parent);
+ cacheEntry.decVolatileDescendant();
+ if (cacheEntry.hasVolatileDescendant() || cacheEntry.isVolatile()) {
+ return;
}
- }
- }
- nodeBuffer.remove(getCacheEntry(node));
- }
+ addToNodeCache(cacheEntry, current);
+ }
- private void addItselfAndAncestorToCache(ICachedMNode node) {
- // the volatile subtree of this node has been deleted, thus there's no
need to flush it
- // add the node and its ancestors to cache
- // if there's flush failure, such node and ancestors will be removed from
cache again by
- // #updateCacheStatusAfterFlushFailure
- ICachedMNode tmp = node;
- while (!tmp.isDatabase()
- && !isInNodeCache(getCacheEntry(tmp))
- && !getCachedMNodeContainer(tmp).hasChildrenInBuffer()
- && !getCacheEntry(tmp).isVolatile()) {
- nodeBuffer.remove(getCacheEntry(tmp));
- addToNodeCache(getCacheEntry(tmp), tmp);
- tmp = tmp.getParent();
+ current = current.getParent();
+ cacheEntry = getCacheEntry(current);
}
}
@@ -250,8 +226,8 @@ public abstract class CacheManager implements ICacheManager
{
*/
@Override
public IDatabaseMNode<ICachedMNode> collectUpdatedStorageGroupMNodes() {
- IDatabaseMNode<ICachedMNode> storageGroupMNode =
nodeBuffer.getUpdatedStorageGroupMNode();
- nodeBuffer.setUpdatedStorageGroupMNode(null);
+ IDatabaseMNode<ICachedMNode> storageGroupMNode =
nodeBuffer.getUpdatedDatabaseMNode();
+ nodeBuffer.removeUpdatedDatabaseNode();
return storageGroupMNode;
}
@@ -283,15 +259,18 @@ public abstract class CacheManager implements
ICacheManager {
private void tryGetNext() {
ICachedMNode node;
- while (nodeBufferIterator.hasNext()) {
+ if (nodeBufferIterator.hasNext()) {
node = nodeBufferIterator.next();
+
+ // prevent this node being added to nodeBuffer during flush
+ // unlock in PBTreeFlushExecutor
+ lockManager.writeLock(node);
+
+ // if there's flush failure, such node and ancestors will be removed
from cache again by
+ // #updateCacheStatusAfterFlushFailure
nodeBuffer.remove(getCacheEntry(node));
- if (getCachedMNodeContainer(node).hasChildrenInBuffer()) {
- nextSubtree = node;
- return;
- } else if (!node.isDatabase()) {
- addItselfAndAncestorToCache(node);
- }
+
+ nextSubtree = node;
}
}
};
@@ -335,36 +314,48 @@ public abstract class CacheManager implements
ICacheManager {
private void tryGetNext() {
ICachedMNode node;
- ICachedMNode tmp;
CacheEntry cacheEntry;
while (bufferedNodeIterator.hasNext()) {
node = bufferedNodeIterator.next();
- // update cache status after persist
- // when process one node, all of its buffered child should be moved to
cache
- // except those with volatile children
- cacheEntry = getCacheEntry(node);
- cacheEntry.setVolatile(false);
- container.moveMNodeToCache(node.getName());
+ // prevent this node being added buffer during the following check and
potential flush
+ // unlock in PBTreeFlushExecutor
+ lockManager.writeLock(node);
+
+ boolean unlockImmediately = true;
+
+ try {
+ // update cache status after persist
+ // when process one node, all of its buffered child should be moved
to cache
+ // except those with volatile children
+ cacheEntry = getCacheEntry(node);
+
+ synchronized (cacheEntry) {
+ cacheEntry.setVolatile(false);
+ container.moveMNodeToCache(node.getName());
+
+ if (cacheEntry.hasVolatileDescendant()
+ && getCachedMNodeContainer(node).hasChildrenInBuffer()) {
+ // these two factor judgement is not redundant because the
#hasVolatileDescendant is
+ // on a higher priority than #container.hasChildren
+
+ // nodes with volatile children should be treated as root of
volatile subtree and
+ // return
+ // for flush
+ nextSubtree = node;
+ unlockImmediately = false;
+ return;
+ }
- if (node.isMeasurement() ||
!getCachedMNodeContainer(node).hasChildrenInBuffer()) {
- // there's no volatile subtree under this node, thus there's no need
to flush it
- // add the node and its ancestors to cache
- // if there's flush failure, such node and ancestors will be removed
from cache again by
- // #updateCacheStatusAfterFlushFailure
- addToNodeCache(cacheEntry, node);
- tmp = node.getParent();
- while (!tmp.isDatabase()
- && !isInNodeCache(getCacheEntry(tmp))
- && !getCachedMNodeContainer(tmp).hasChildrenInBuffer()) {
- addToNodeCache(getCacheEntry(tmp), tmp);
- tmp = tmp.getParent();
+ // there's no direct volatile subtree under this node, thus
there's no need to flush it
+ // add the node and its ancestors to cache
+ addToNodeCache(cacheEntry, node);
+ addAncestorsToCache(node);
+ }
+ } finally {
+ if (unlockImmediately) {
+ lockManager.writeUnlock(node);
}
- } else {
- // nodes with volatile children should be treated as root of
volatile subtree and return
- // for flush
- nextSubtree = node;
- return;
}
}
}
@@ -372,45 +363,39 @@ public abstract class CacheManager implements
ICacheManager {
@Override
public void updateCacheStatusAfterFlushFailure(ICachedMNode subtreeRoot) {
- nodeBuffer.put(getCacheEntry(subtreeRoot), subtreeRoot);
- if (subtreeRoot.isDatabase()) {
- return;
- }
- removeAncestorsFromCache(subtreeRoot);
+ nodeBuffer.addBackToBufferAfterFlushFailure(subtreeRoot);
}
+ /**
+ * When this method is invoked, it should be guaranteed that the target node
has no descendent.
+ */
@Override
public void remove(ICachedMNode node) {
- removeRecursively(node);
- addItselfAndAncestorToCache(node.getParent());
- }
+ CacheEntry cacheEntry = node.getCacheEntry();
+ synchronized (cacheEntry) {
+ if (cacheEntry.hasVolatileDescendant()) {
+ throw new IllegalStateException(
+ String.format(
+ "There should not exist descendant under this node %s",
node.getFullPath()));
+ }
+ if (cacheEntry.isVolatile()) {
+ addAncestorsToCache(node);
+ if (!getCacheEntry(node.getParent()).hasVolatileDescendant()) {
+ nodeBuffer.remove(getCacheEntry(node.getParent()));
+ }
+ } else {
+ removeFromNodeCache(cacheEntry);
+ }
- private void removeOne(CacheEntry cacheEntry, ICachedMNode node) {
- if (isInNodeCache(cacheEntry)) {
- removeFromNodeCache(cacheEntry);
- } else {
- nodeBuffer.remove(cacheEntry);
+ node.setCacheEntry(null);
}
- node.setCacheEntry(null);
-
if (cacheEntry.isPinned()) {
memManager.releasePinnedMemResource(node);
}
memManager.releaseMemResource(node);
}
- private void removeRecursively(ICachedMNode node) {
- CacheEntry cacheEntry = getCacheEntry(node);
- if (cacheEntry == null) {
- return;
- }
- removeOne(cacheEntry, node);
- for (ICachedMNode child : node.getChildren().values()) {
- removeRecursively(child);
- }
- }
-
/**
* Choose an evictable node from nodeCache and evicted all the cached node
in the subtree it
* represented.
@@ -419,40 +404,56 @@ public abstract class CacheManager implements
ICacheManager {
*/
@Override
public synchronized boolean evict() {
- ICachedMNode node = null;
- CacheEntry cacheEntry = null;
- List<ICachedMNode> evictedMNodes = new ArrayList<>();
- boolean isSuccess = false;
- while (!isSuccess) {
- node = getPotentialNodeTobeEvicted();
- if (node == null) {
- break;
- }
- cacheEntry = getCacheEntry(node);
- // the operation that may change the cache status of a node should be
synchronized
- synchronized (cacheEntry) {
- if (!cacheEntry.isPinned() && isInNodeCache(cacheEntry)) {
- getBelongedContainer(node).evictMNode(node.getName());
- if (node.isMeasurement()) {
- String alias = node.getAsMeasurementMNode().getAlias();
- if (alias != null) {
- node.getParent().getAsDeviceMNode().deleteAliasChild(alias);
+ lockManager.globalReadLock();
+ try {
+ ICachedMNode node = null;
+ CacheEntry cacheEntry = null;
+ List<ICachedMNode> evictedMNodes = new ArrayList<>();
+ boolean isSuccess = false;
+ while (!isSuccess) {
+ node = getPotentialNodeTobeEvicted();
+ if (node == null) {
+ break;
+ }
+ lockManager.threadReadLock(node.getParent(), true);
+ try {
+ cacheEntry = getCacheEntry(node);
+ // the operation that may change the cache status of a node should
be synchronized
+ synchronized (cacheEntry) {
+ if (cacheEntry.isPinned()
+ || cacheEntry.isVolatile()
+ || cacheEntry.hasVolatileDescendant()) {
+ // since the node could be moved from cache to buffer after
being taken from cache
+ // this check here is necessary to ensure that the node could
truly be evicted
+ continue;
+ }
+
+ getBelongedContainer(node).evictMNode(node.getName());
+ if (node.isMeasurement()) {
+ String alias = node.getAsMeasurementMNode().getAlias();
+ if (alias != null) {
+ node.getParent().getAsDeviceMNode().deleteAliasChild(alias);
+ }
}
+ removeFromNodeCache(getCacheEntry(node));
+ node.setCacheEntry(null);
+ evictedMNodes.add(node);
+ isSuccess = true;
}
- removeFromNodeCache(getCacheEntry(node));
- node.setCacheEntry(null);
- evictedMNodes.add(node);
- isSuccess = true;
+ } finally {
+ lockManager.threadReadUnlock(node.getParent());
}
}
- }
- if (node != null) {
- collectEvictedMNodes(node, evictedMNodes);
- }
+ if (node != null) {
+ collectEvictedMNodes(node, evictedMNodes);
+ }
- memManager.releaseMemResource(evictedMNodes);
- return !evictedMNodes.isEmpty();
+ memManager.releaseMemResource(evictedMNodes);
+ return !evictedMNodes.isEmpty();
+ } finally {
+ lockManager.globalReadUnlock();
+ }
}
private void collectEvictedMNodes(ICachedMNode node, List<ICachedMNode>
evictedMNodes) {
@@ -554,7 +555,7 @@ public abstract class CacheManager implements ICacheManager
{
public void clear(ICachedMNode root) {
clearMNodeInMemory(root);
clearNodeCache();
- nodeBuffer.setUpdatedStorageGroupMNode(null);
+ nodeBuffer.removeUpdatedDatabaseNode();
nodeBuffer.clear();
}
@@ -590,12 +591,6 @@ public abstract class CacheManager implements
ICacheManager {
protected abstract void updateCacheStatusAfterAccess(CacheEntry cacheEntry);
- // MNode update operation like node replace may reset the mapping between
cacheEntry and node,
- // thus it should be updated
- protected abstract void updateCacheStatusAfterUpdate(CacheEntry cacheEntry,
ICachedMNode node);
-
- protected abstract boolean isInNodeCache(CacheEntry cacheEntry);
-
protected abstract void addToNodeCache(CacheEntry cacheEntry, ICachedMNode
node);
protected abstract void removeFromNodeCache(CacheEntry cacheEntry);
@@ -603,139 +598,4 @@ public abstract class CacheManager implements
ICacheManager {
protected abstract ICachedMNode getPotentialNodeTobeEvicted();
protected abstract void clearNodeCache();
-
- private static class NodeBuffer {
-
- private static final int MAP_NUM = 17;
-
- private IDatabaseMNode<ICachedMNode> updatedStorageGroupMNode;
- private final Map<CacheEntry, ICachedMNode>[] maps = new Map[MAP_NUM];
-
- private final Map<Integer, NodeBufferIterator> currentIteratorMap = new
ConcurrentHashMap<>();
-
- NodeBuffer() {
- for (int i = 0; i < MAP_NUM; i++) {
- maps[i] = new ConcurrentHashMap<>();
- }
- }
-
- public IDatabaseMNode<ICachedMNode> getUpdatedStorageGroupMNode() {
- return updatedStorageGroupMNode;
- }
-
- public void setUpdatedStorageGroupMNode(IDatabaseMNode<ICachedMNode>
updatedStorageGroupMNode) {
- this.updatedStorageGroupMNode = updatedStorageGroupMNode;
- }
-
- void put(CacheEntry cacheEntry, ICachedMNode node) {
- maps[getLoc(cacheEntry)].put(cacheEntry, node);
- if (!currentIteratorMap.isEmpty()) {
- for (NodeBufferIterator nodeBufferIterator :
currentIteratorMap.values()) {
- nodeBufferIterator.checkHasNew(getLoc(cacheEntry));
- }
- }
- }
-
- void remove(CacheEntry cacheEntry) {
- maps[getLoc(cacheEntry)].remove(cacheEntry);
- }
-
- long getBufferNodeNum() {
- long res = updatedStorageGroupMNode == null ? 0 : 1;
- for (int i = 0; i < MAP_NUM; i++) {
- res += maps[i].size();
- }
- return res;
- }
-
- void clear() {
- for (Map<CacheEntry, ICachedMNode> map : maps) {
- map.clear();
- }
- }
-
- private int getLoc(CacheEntry cacheEntry) {
- int hash = cacheEntry.hashCode() % MAP_NUM;
- return hash < 0 ? hash + MAP_NUM : hash;
- }
-
- Iterator<ICachedMNode> iterator() {
- NodeBufferIterator iterator = new NodeBufferIterator();
- currentIteratorMap.put(iterator.hashCode, iterator);
- return iterator;
- }
-
- private class NodeBufferIterator implements Iterator<ICachedMNode> {
- volatile int mapIndex = 0;
- Iterator<ICachedMNode> currentIterator = maps[0].values().iterator();
-
- ICachedMNode nextNode = null;
-
- volatile boolean hasNew = false;
-
- private final int hashCode = super.hashCode();
-
- @Override
- public boolean hasNext() {
- if (nextNode == null) {
- tryGetNext();
- if (nextNode == null && hasNew) {
- synchronized (this) {
- hasNew = false;
- mapIndex = 0;
- }
- currentIterator = maps[0].values().iterator();
- tryGetNext();
- }
- }
- if (nextNode == null) {
- currentIteratorMap.remove(hashCode);
- return false;
- } else {
- return true;
- }
- }
-
- @Override
- public ICachedMNode next() {
- if (!hasNext()) {
- throw new NoSuchElementException();
- }
- ICachedMNode result = nextNode;
- nextNode = null;
- return result;
- }
-
- private void tryGetNext() {
- if (mapIndex >= maps.length) {
- return;
- }
-
- while (!currentIterator.hasNext()) {
- currentIterator = null;
-
- synchronized (this) {
- mapIndex++;
- }
-
- if (mapIndex == maps.length) {
- return;
- }
- currentIterator = maps[mapIndex].values().iterator();
- }
-
- nextNode = currentIterator.next();
- }
-
- private void checkHasNew(int index) {
- if (mapIndex >= index) {
- synchronized (this) {
- if (mapIndex >= index) {
- hasNew = true;
- }
- }
- }
- }
- }
- }
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/cache/CacheMemoryManager.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/cache/CacheMemoryManager.java
index ddf05c20b82..d5c6bec1e28 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/cache/CacheMemoryManager.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/cache/CacheMemoryManager.java
@@ -27,6 +27,7 @@ import
org.apache.iotdb.db.schemaengine.metric.SchemaEngineCachedMetric;
import org.apache.iotdb.db.schemaengine.rescon.CachedSchemaEngineStatistics;
import org.apache.iotdb.db.schemaengine.rescon.ISchemaEngineStatistics;
import
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.CachedMTreeStore;
+import
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.lock.LockManager;
import
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.memcontrol.IReleaseFlushStrategy;
import
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.memcontrol.MemManager;
import
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.memcontrol.ReleaseFlushStrategyNumBasedImpl;
@@ -80,8 +81,9 @@ public class CacheMemoryManager {
* @param store CachedMTreeStore
* @return LRUCacheManager
*/
- public ICacheManager createLRUCacheManager(CachedMTreeStore store,
MemManager memManager) {
- ICacheManager cacheManager = new LRUCacheManager(memManager);
+ public ICacheManager createLRUCacheManager(
+ CachedMTreeStore store, MemManager memManager, LockManager lockManager) {
+ ICacheManager cacheManager = new LRUCacheManager(memManager, lockManager);
storeList.add(store);
return cacheManager;
}
@@ -210,12 +212,7 @@ public class CacheMemoryManager {
store ->
CompletableFuture.runAsync(
() -> {
- store.getLock().threadReadLock(true);
- try {
- executeMemoryRelease(store);
- } finally {
- store.getLock().threadReadUnlock();
- }
+ executeMemoryRelease(store);
},
releaseTaskProcessor))
.toArray(CompletableFuture[]::new))
@@ -260,13 +257,7 @@ public class CacheMemoryManager {
store ->
CompletableFuture.runAsync(
() -> {
- store.getLock().writeLock();
- try {
- store.flushVolatileNodes();
- executeMemoryRelease(store);
- } finally {
- store.getLock().unlockWrite();
- }
+ store.flushVolatileNodes(true);
},
flushTaskProcessor))
.toArray(CompletableFuture[]::new))
@@ -359,5 +350,6 @@ public class CacheMemoryManager {
}
};
registerFlushTask();
+ registerReleaseTask();
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/cache/ICacheManager.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/cache/ICacheManager.java
index 1a5859f34dc..c50f01317fe 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/cache/ICacheManager.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/cache/ICacheManager.java
@@ -39,8 +39,16 @@ public interface ICacheManager {
IDatabaseMNode<ICachedMNode> collectUpdatedStorageGroupMNodes();
+ /**
+ * The returned node by iterator will automatically take the write lock.
Please unlock the node
+ * after process.
+ */
Iterator<ICachedMNode> collectVolatileSubtrees();
+ /**
+ * The returned node by iterator will automatically take the write lock.
Please unlock the node
+ * after process.
+ */
Iterator<ICachedMNode>
updateCacheStatusAndRetrieveSubtreeAfterPersist(ICachedMNode subtreeRoot);
void updateCacheStatusAfterFlushFailure(ICachedMNode subtreeRoot);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/mnode/info/CacheMNodeInfo.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/cache/INodeBuffer.java
similarity index 59%
copy from
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/mnode/info/CacheMNodeInfo.java
copy to
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/cache/INodeBuffer.java
index 656b4f94a95..121ba6cfa6e 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/mnode/info/CacheMNodeInfo.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/cache/INodeBuffer.java
@@ -16,30 +16,33 @@
* specific language governing permissions and limitations
* under the License.
*/
-package
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.mnode.info;
-import
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.mem.mnode.info.BasicMNodeInfo;
-import
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.cache.CacheEntry;
+package org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.cache;
-public class CacheMNodeInfo extends BasicMNodeInfo {
+import org.apache.iotdb.commons.schema.node.role.IDatabaseMNode;
+import
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.mnode.ICachedMNode;
- private CacheEntry cacheEntry;
+import java.util.Iterator;
- public CacheMNodeInfo(String name) {
- super(name);
- }
+public interface INodeBuffer {
- public CacheEntry getCacheEntry() {
- return cacheEntry;
- }
+ IDatabaseMNode<ICachedMNode> getUpdatedDatabaseMNode();
- public void setCacheEntry(CacheEntry cacheEntry) {
- this.cacheEntry = cacheEntry;
- }
+ void updateDatabaseNodeAfterStatusUpdate(IDatabaseMNode<ICachedMNode>
updatedDatabaseMNode);
- @Override
- public int estimateSize() {
- // Estimated size of CacheEntry = 40
- return super.estimateSize() + 40;
- }
+ void removeUpdatedDatabaseNode();
+
+ void addNewNodeToBuffer(ICachedMNode node);
+
+ void addUpdatedNodeToBuffer(ICachedMNode node);
+
+ void addBackToBufferAfterFlushFailure(ICachedMNode subTreeRoot);
+
+ void remove(CacheEntry cacheEntry);
+
+ long getBufferNodeNum();
+
+ void clear();
+
+ Iterator<ICachedMNode> iterator();
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/cache/LRUCacheManager.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/cache/LRUCacheManager.java
index 9c044acb932..d1939a526c2 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/cache/LRUCacheManager.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/cache/LRUCacheManager.java
@@ -19,6 +19,7 @@
package org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.cache;
+import
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.lock.LockManager;
import
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.memcontrol.MemManager;
import
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.mnode.ICachedMNode;
@@ -33,8 +34,8 @@ public class LRUCacheManager extends CacheManager {
private final LRUCacheList[] lruCacheLists = new LRUCacheList[NUM_OF_LIST];
- public LRUCacheManager(MemManager memManager) {
- super(memManager);
+ public LRUCacheManager(MemManager memManager, LockManager lockManager) {
+ super(memManager, lockManager);
for (int i = 0; i < NUM_OF_LIST; i++) {
lruCacheLists[i] = new LRUCacheList();
}
@@ -46,29 +47,16 @@ public class LRUCacheManager extends CacheManager {
getTargetCacheList(lruCacheEntry).updateCacheStatusAfterAccess(lruCacheEntry);
}
- // MNode update operation like node replace may reset the mapping between
cacheEntry and node,
- // thus it should be updated
- @Override
- protected void updateCacheStatusAfterUpdate(CacheEntry cacheEntry,
ICachedMNode node) {
- getAsLRUCacheEntry(cacheEntry).setNode(node);
- }
-
@Override
protected void initCacheEntryForNode(ICachedMNode node) {
LRUCacheEntry cacheEntry = new LRUCacheEntry(node);
node.setCacheEntry(cacheEntry);
}
- @Override
- protected boolean isInNodeCache(CacheEntry cacheEntry) {
- LRUCacheEntry lruCacheEntry = getAsLRUCacheEntry(cacheEntry);
- return getTargetCacheList(lruCacheEntry).isInCacheList(lruCacheEntry);
- }
-
@Override
protected void addToNodeCache(CacheEntry cacheEntry, ICachedMNode node) {
LRUCacheEntry lruCacheEntry = getAsLRUCacheEntry(cacheEntry);
- getTargetCacheList(lruCacheEntry).addToCacheList(lruCacheEntry, node);
+ getTargetCacheList(lruCacheEntry).addToCacheList(lruCacheEntry);
}
@Override
@@ -121,9 +109,7 @@ public class LRUCacheManager extends CacheManager {
@SuppressWarnings("java:S3077")
private static class LRUCacheEntry extends CacheEntry {
- // although the node instance may be replaced, the name and full path of
the node won't be
- // changed, which means the cacheEntry always map to only one logic node
- protected volatile ICachedMNode node;
+ protected final ICachedMNode node;
private volatile LRUCacheEntry pre = null;
@@ -137,10 +123,6 @@ public class LRUCacheManager extends CacheManager {
return node;
}
- public void setNode(ICachedMNode node) {
- this.node = node;
- }
-
LRUCacheEntry getPre() {
return pre;
}
@@ -195,12 +177,15 @@ public class LRUCacheManager extends CacheManager {
}
}
- private void addToCacheList(LRUCacheEntry lruCacheEntry, ICachedMNode
node) {
+ private void addToCacheList(LRUCacheEntry lruCacheEntry) {
lock.lock();
try {
- lruCacheEntry.setNode(node);
- moveToFirst(lruCacheEntry);
- size.getAndIncrement();
+ if (isInCacheList(lruCacheEntry)) {
+ moveToFirst(lruCacheEntry);
+ } else {
+ moveToFirst(lruCacheEntry);
+ size.getAndIncrement();
+ }
} finally {
lock.unlock();
}
@@ -209,8 +194,10 @@ public class LRUCacheManager extends CacheManager {
private void removeFromCacheList(LRUCacheEntry lruCacheEntry) {
lock.lock();
try {
- removeOne(lruCacheEntry);
- size.getAndDecrement();
+ if (isInCacheList(lruCacheEntry)) {
+ removeOne(lruCacheEntry);
+ size.getAndDecrement();
+ }
} finally {
lock.unlock();
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/cache/NodeBuffer.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/cache/NodeBuffer.java
new file mode 100644
index 00000000000..784323f5eba
--- /dev/null
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/cache/NodeBuffer.java
@@ -0,0 +1,216 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.cache;
+
+import org.apache.iotdb.commons.schema.node.role.IDatabaseMNode;
+import
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.mnode.ICachedMNode;
+
+import java.util.Iterator;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.concurrent.ConcurrentHashMap;
+
+public class NodeBuffer implements INodeBuffer {
+
+ private static final int MAP_NUM = 17;
+
+ private IDatabaseMNode<ICachedMNode> updatedDatabaseMNode;
+ private final Map<CacheEntry, ICachedMNode>[] maps = new Map[MAP_NUM];
+
+ private final Map<Integer, NodeBufferIterator> currentIteratorMap = new
ConcurrentHashMap<>();
+
+ NodeBuffer() {
+ for (int i = 0; i < MAP_NUM; i++) {
+ maps[i] = new ConcurrentHashMap<>();
+ }
+ }
+
+ @Override
+ public IDatabaseMNode<ICachedMNode> getUpdatedDatabaseMNode() {
+ return updatedDatabaseMNode;
+ }
+
+ @Override
+ public void updateDatabaseNodeAfterStatusUpdate(
+ IDatabaseMNode<ICachedMNode> updatedDatabaseMNode) {
+ this.updatedDatabaseMNode = updatedDatabaseMNode;
+ }
+
+ @Override
+ public void removeUpdatedDatabaseNode() {
+ this.updatedDatabaseMNode = null;
+ }
+
+ @Override
+ public void addNewNodeToBuffer(ICachedMNode node) {
+ addNonVolatileAncestorToBuffer(node);
+ }
+
+ @Override
+ public void addUpdatedNodeToBuffer(ICachedMNode node) {
+ /*
+ * The node may already exist in nodeBuffer before change it to volatile
status, remove it and ensure
+ * the volatile subtree it belongs to is in nodeBuffer
+ */
+ remove(node.getCacheEntry());
+ addNonVolatileAncestorToBuffer(node);
+ }
+
+ @Override
+ public void addBackToBufferAfterFlushFailure(ICachedMNode subTreeRoot) {
+ put(subTreeRoot.getCacheEntry(), subTreeRoot);
+ }
+
+ /**
+ * look for the first none volatile ancestor of this node and add it to
nodeBuffer. Through this
+ * the volatile subtree the given node belong to will be record in
nodeBuffer.
+ */
+ private void addNonVolatileAncestorToBuffer(ICachedMNode node) {
+ ICachedMNode parent = node.getParent();
+ CacheEntry cacheEntry = parent.getCacheEntry();
+
+ // make sure that the nodeBuffer contains all the root node of volatile
subTree
+ // give that root.sg.d.s, if sg and d have been persisted and s are
volatile, then d
+ // will be added to nodeBuffer
+ synchronized (cacheEntry) {
+ // the cacheEntry may be set to volatile concurrently, the unVolatile
node should not be
+ // added
+ // to nodeBuffer, which prevent the duplicated collecting on subTree
+ if (!cacheEntry.isVolatile()) {
+ put(cacheEntry, parent);
+ }
+ }
+ }
+
+ private void put(CacheEntry cacheEntry, ICachedMNode node) {
+ maps[getLoc(cacheEntry)].put(cacheEntry, node);
+ if (!currentIteratorMap.isEmpty()) {
+ for (NodeBufferIterator nodeBufferIterator :
currentIteratorMap.values()) {
+ nodeBufferIterator.checkHasNew(getLoc(cacheEntry));
+ }
+ }
+ }
+
+ @Override
+ public void remove(CacheEntry cacheEntry) {
+ maps[getLoc(cacheEntry)].remove(cacheEntry);
+ }
+
+ @Override
+ public long getBufferNodeNum() {
+ long res = updatedDatabaseMNode == null ? 0 : 1;
+ for (int i = 0; i < MAP_NUM; i++) {
+ res += maps[i].size();
+ }
+ return res;
+ }
+
+ @Override
+ public void clear() {
+ for (Map<CacheEntry, ICachedMNode> map : maps) {
+ map.clear();
+ }
+ }
+
+ private int getLoc(CacheEntry cacheEntry) {
+ int hash = cacheEntry.hashCode() % MAP_NUM;
+ return hash < 0 ? hash + MAP_NUM : hash;
+ }
+
+ @Override
+ public Iterator<ICachedMNode> iterator() {
+ NodeBufferIterator iterator = new NodeBufferIterator();
+ currentIteratorMap.put(iterator.hashCode, iterator);
+ return iterator;
+ }
+
+ private class NodeBufferIterator implements Iterator<ICachedMNode> {
+ volatile int mapIndex = 0;
+ Iterator<ICachedMNode> currentIterator = maps[0].values().iterator();
+
+ ICachedMNode nextNode = null;
+
+ volatile boolean hasNew = false;
+
+ private final int hashCode = super.hashCode();
+
+ @Override
+ public boolean hasNext() {
+ if (nextNode == null) {
+ tryGetNext();
+ if (nextNode == null && hasNew) {
+ synchronized (this) {
+ hasNew = false;
+ mapIndex = 0;
+ }
+ currentIterator = maps[0].values().iterator();
+ tryGetNext();
+ }
+ }
+ if (nextNode == null) {
+ currentIteratorMap.remove(hashCode);
+ return false;
+ } else {
+ return true;
+ }
+ }
+
+ @Override
+ public ICachedMNode next() {
+ if (!hasNext()) {
+ throw new NoSuchElementException();
+ }
+ ICachedMNode result = nextNode;
+ nextNode = null;
+ return result;
+ }
+
+ private void tryGetNext() {
+ if (mapIndex >= maps.length) {
+ return;
+ }
+
+ while (!currentIterator.hasNext()) {
+ currentIterator = null;
+
+ synchronized (this) {
+ mapIndex++;
+ }
+
+ if (mapIndex == maps.length) {
+ return;
+ }
+ currentIterator = maps[mapIndex].values().iterator();
+ }
+
+ nextNode = currentIterator.next();
+ }
+
+ private void checkHasNew(int index) {
+ if (mapIndex >= index) {
+ synchronized (this) {
+ if (mapIndex >= index) {
+ hasNew = true;
+ }
+ }
+ }
+ }
+ }
+}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/cache/PlainCacheManager.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/cache/PlainCacheManager.java
index 14cefdc9f4c..a6e0553bbe1 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/cache/PlainCacheManager.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/cache/PlainCacheManager.java
@@ -18,6 +18,7 @@
*/
package org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.cache;
+import
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.lock.LockManager;
import
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.memcontrol.MemManager;
import
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.mnode.ICachedMNode;
@@ -32,25 +33,13 @@ public class PlainCacheManager extends CacheManager {
@SuppressWarnings("java:S3077")
private volatile Map<CacheEntry, ICachedMNode> nodeCache = new
ConcurrentHashMap<>();
- public PlainCacheManager(MemManager memManager) {
- super(memManager);
+ public PlainCacheManager(MemManager memManager, LockManager lockManager) {
+ super(memManager, lockManager);
}
@Override
protected void updateCacheStatusAfterAccess(CacheEntry cacheEntry) {}
- // MNode update operation like node replace may reset the mapping between
cacheEntry and node,
- // thus it should be updated
- @Override
- protected void updateCacheStatusAfterUpdate(CacheEntry cacheEntry,
ICachedMNode node) {
- nodeCache.replace(cacheEntry, node);
- }
-
- @Override
- protected boolean isInNodeCache(CacheEntry cacheEntry) {
- return nodeCache.containsKey(cacheEntry);
- }
-
@Override
protected void addToNodeCache(CacheEntry cacheEntry, ICachedMNode node) {
nodeCache.put(cacheEntry, node);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/flush/PBTreeFlushExecutor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/flush/PBTreeFlushExecutor.java
index 6a6027c6909..614c28087fa 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/flush/PBTreeFlushExecutor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/flush/PBTreeFlushExecutor.java
@@ -21,6 +21,7 @@ package
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.flush;
import org.apache.iotdb.commons.exception.MetadataException;
import
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.cache.ICacheManager;
+import
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.lock.LockManager;
import
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.mnode.ICachedMNode;
import
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.schemafile.ISchemaFile;
@@ -29,8 +30,10 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.ArrayDeque;
+import java.util.ArrayList;
import java.util.Deque;
import java.util.Iterator;
+import java.util.List;
public class PBTreeFlushExecutor {
@@ -42,16 +45,31 @@ public class PBTreeFlushExecutor {
private final ISchemaFile file;
+ private final LockManager lockManager;
+
public PBTreeFlushExecutor(
- ICachedMNode subtreeRoot, ICacheManager cacheManager, ISchemaFile file) {
+ ICachedMNode subtreeRoot,
+ boolean needLock,
+ ICacheManager cacheManager,
+ ISchemaFile file,
+ LockManager lockManager) {
this.subtreeRoot = subtreeRoot;
this.cacheManager = cacheManager;
this.file = file;
+ this.lockManager = lockManager;
}
public void flushVolatileNodes() throws MetadataException, IOException {
+ Iterator<ICachedMNode> volatileSubtreeIterator;
+ List<ICachedMNode> collectedVolatileSubtrees;
try {
file.writeMNode(this.subtreeRoot);
+ collectedVolatileSubtrees = new ArrayList<>();
+ volatileSubtreeIterator =
+
cacheManager.updateCacheStatusAndRetrieveSubtreeAfterPersist(this.subtreeRoot);
+ while (volatileSubtreeIterator.hasNext()) {
+ collectedVolatileSubtrees.add(volatileSubtreeIterator.next());
+ }
} catch (MetadataException | IOException e) {
logger.warn(
"Error occurred during MTree flush, current node is {}",
@@ -59,11 +77,12 @@ public class PBTreeFlushExecutor {
e);
cacheManager.updateCacheStatusAfterFlushFailure(this.subtreeRoot);
throw e;
+ } finally {
+ lockManager.writeUnlock(subtreeRoot);
}
Deque<Iterator<ICachedMNode>> volatileSubtreeStack = new ArrayDeque<>();
- volatileSubtreeStack.push(
-
cacheManager.updateCacheStatusAndRetrieveSubtreeAfterPersist(this.subtreeRoot));
+ volatileSubtreeStack.push(collectedVolatileSubtrees.iterator());
Iterator<ICachedMNode> subtreeIterator;
ICachedMNode subtreeRoot;
@@ -78,15 +97,22 @@ public class PBTreeFlushExecutor {
try {
file.writeMNode(subtreeRoot);
+ collectedVolatileSubtrees = new ArrayList<>();
+ volatileSubtreeIterator =
+
cacheManager.updateCacheStatusAndRetrieveSubtreeAfterPersist(subtreeRoot);
+ while (volatileSubtreeIterator.hasNext()) {
+ collectedVolatileSubtrees.add(volatileSubtreeIterator.next());
+ }
} catch (MetadataException | IOException e) {
logger.warn(
"Error occurred during MTree flush, current node is {}",
subtreeRoot.getFullPath(), e);
processNotFlushedSubtrees(subtreeRoot, volatileSubtreeStack);
throw e;
+ } finally {
+ lockManager.writeUnlock(subtreeRoot);
}
- volatileSubtreeStack.push(
-
cacheManager.updateCacheStatusAndRetrieveSubtreeAfterPersist(subtreeRoot));
+ volatileSubtreeStack.push(collectedVolatileSubtrees.iterator());
}
}
@@ -94,10 +120,13 @@ public class PBTreeFlushExecutor {
ICachedMNode currentNode, Deque<Iterator<ICachedMNode>>
volatileSubtreeStack) {
cacheManager.updateCacheStatusAfterFlushFailure(currentNode);
Iterator<ICachedMNode> subtreeIterator;
+ ICachedMNode node;
while (!volatileSubtreeStack.isEmpty()) {
subtreeIterator = volatileSubtreeStack.pop();
while (subtreeIterator.hasNext()) {
-
cacheManager.updateCacheStatusAfterFlushFailure(subtreeIterator.next());
+ node = subtreeIterator.next();
+ cacheManager.updateCacheStatusAfterFlushFailure(node);
+ lockManager.writeUnlock(node);
}
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/cache/CacheEntry.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/lock/LockEntry.java
similarity index 71%
copy from
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/cache/CacheEntry.java
copy to
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/lock/LockEntry.java
index 1a686132571..6c59a1b8208 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/cache/CacheEntry.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/lock/LockEntry.java
@@ -16,33 +16,30 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.cache;
-import java.util.concurrent.atomic.AtomicInteger;
+package org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.lock;
-public class CacheEntry {
+import java.util.concurrent.atomic.AtomicInteger;
- private volatile boolean isVolatile = false;
+public class LockEntry {
- private final AtomicInteger semaphore = new AtomicInteger();
+ private final AtomicInteger pinSemaphore = new AtomicInteger(0);
- public boolean isVolatile() {
- return isVolatile;
- }
-
- public void setVolatile(boolean aVolatile) {
- isVolatile = aVolatile;
- }
+ private final StampedWriterPreferredLock lock = new
StampedWriterPreferredLock();
public void pin() {
- semaphore.getAndIncrement();
+ pinSemaphore.getAndIncrement();
}
- public void unPin() {
- semaphore.getAndDecrement();
+ public void unpin() {
+ pinSemaphore.getAndDecrement();
}
public boolean isPinned() {
- return semaphore.get() > 0;
+ return pinSemaphore.get() > 0;
+ }
+
+ public StampedWriterPreferredLock getLock() {
+ return lock;
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/lock/LockManager.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/lock/LockManager.java
new file mode 100644
index 00000000000..af25af6505a
--- /dev/null
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/lock/LockManager.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.lock;
+
+import
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.mnode.ICachedMNode;
+
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Consumer;
+
+public class LockManager {
+
+ private final LockPool lockPool = new LockPool();
+
+ private final StampedWriterPreferredLock readWriteLock = new
StampedWriterPreferredLock();
+
+ public long stampedReadLock(ICachedMNode node) {
+ AtomicLong stamp = new AtomicLong();
+ takeMNodeLock(node, lock -> stamp.set(lock.stampedReadLock()));
+ return stamp.get();
+ }
+
+ public void stampedReadUnlock(ICachedMNode node, long stamp) {
+ checkAndReleaseMNodeLock(node, lock -> lock.stampedReadUnlock(stamp));
+ }
+
+ public void threadReadLock(ICachedMNode node) {
+ takeMNodeLock(node, StampedWriterPreferredLock::threadReadLock);
+ }
+
+ public void threadReadLock(ICachedMNode node, boolean prior) {
+ takeMNodeLock(node, lock -> lock.threadReadLock(prior));
+ }
+
+ public void threadReadUnlock(ICachedMNode node) {
+ checkAndReleaseMNodeLock(node,
StampedWriterPreferredLock::threadReadUnlock);
+ }
+
+ public void writeLock(ICachedMNode node) {
+ takeMNodeLock(node, StampedWriterPreferredLock::writeLock);
+ }
+
+ public void writeUnlock(ICachedMNode node) {
+ checkAndReleaseMNodeLock(node, StampedWriterPreferredLock::writeUnlock);
+ }
+
+ private void takeMNodeLock(
+ ICachedMNode node, Consumer<StampedWriterPreferredLock> lockOperation) {
+ LockEntry lockEntry;
+ synchronized (this) {
+ lockEntry = node.getLockEntry();
+ if (lockEntry == null) {
+ lockEntry = lockPool.borrowLock();
+ node.setLockEntry(lockEntry);
+ }
+ lockEntry.pin();
+ }
+ lockOperation.accept(lockEntry.getLock());
+ }
+
+ private void checkAndReleaseMNodeLock(
+ ICachedMNode node, Consumer<StampedWriterPreferredLock> unLockOperation)
{
+ synchronized (this) {
+ LockEntry lockEntry = node.getLockEntry();
+ StampedWriterPreferredLock lock = lockEntry.getLock();
+ unLockOperation.accept(lock);
+ lockEntry.unpin();
+ if (lock.isFree() && !lockEntry.isPinned()) {
+ node.setLockEntry(null);
+ lockPool.returnLock(lockEntry);
+ }
+ }
+ }
+
+ public long globalStampedReadLock() {
+ return readWriteLock.stampedReadLock();
+ }
+
+ public void globalStampedReadUnlock(long stamp) {
+ readWriteLock.stampedReadUnlock(stamp);
+ }
+
+ public void globalReadLock() {
+ readWriteLock.threadReadLock();
+ }
+
+ public void globalReadUnlock() {
+ readWriteLock.threadReadUnlock();
+ }
+
+ public void globalWriteLock() {
+ readWriteLock.writeLock();
+ }
+
+ public void globalWriteUnlock() {
+ readWriteLock.writeUnlock();
+ }
+
+ private static class LockPool {
+ private static final int LOCK_POOL_CAPACITY = 400;
+
+ private final List<LockEntry> lockList = new LinkedList<>();
+
+ private LockPool() {
+ for (int i = 0; i < LOCK_POOL_CAPACITY; i++) {
+ lockList.add(new LockEntry());
+ }
+ }
+
+ private LockEntry borrowLock() {
+ synchronized (lockList) {
+ if (lockList.isEmpty()) {
+ return new LockEntry();
+ } else {
+ return lockList.remove(0);
+ }
+ }
+ }
+
+ private void returnLock(LockEntry lockEntry) {
+ synchronized (lockList) {
+ if (lockList.size() == LOCK_POOL_CAPACITY) {
+ return;
+ }
+ lockList.add(0, lockEntry);
+ }
+ }
+ }
+}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/StampedWriterPreferredLock.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/lock/StampedWriterPreferredLock.java
similarity index 94%
rename from
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/StampedWriterPreferredLock.java
rename to
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/lock/StampedWriterPreferredLock.java
index c48624c6a3b..83961523af9 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/StampedWriterPreferredLock.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/lock/StampedWriterPreferredLock.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree;
+package org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.lock;
import java.util.HashMap;
import java.util.Map;
@@ -58,6 +58,9 @@ public class StampedWriterPreferredLock {
private volatile int writeWait = 0;
private final ThreadLocal<Long> sharedOwnerStamp = new ThreadLocal<>();
+
+ private final int hasCode = super.hashCode();
+
/**
* Acquires the stamp-bound read lock. Read lock acquire and release is
stamp-bound and supports
* re-entry by the same stamp. Return a new stamp if no thread holds a write
lock; block and wait
@@ -202,7 +205,7 @@ public class StampedWriterPreferredLock {
}
/** Unlock WriteLock */
- public void unlockWrite() {
+ public void writeUnlock() {
lock.lock();
try {
writeCnt--;
@@ -217,4 +220,23 @@ public class StampedWriterPreferredLock {
lock.unlock();
}
}
+
+ public boolean isFree() {
+ lock.lock();
+ try {
+ return readCnt.isEmpty() && readWait == 0 && writeCnt == 0 && writeWait
== 0;
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ @Override
+ public int hashCode() {
+ return hasCode;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ return super.equals(obj);
+ }
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/mnode/ICachedMNode.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/mnode/ICachedMNode.java
index 1b8245167e9..129c38913d0 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/mnode/ICachedMNode.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/mnode/ICachedMNode.java
@@ -20,9 +20,14 @@ package
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.mnode;
import org.apache.iotdb.commons.schema.node.IMNode;
import
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.cache.CacheEntry;
+import
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.lock.LockEntry;
public interface ICachedMNode extends IMNode<ICachedMNode> {
CacheEntry getCacheEntry();
void setCacheEntry(CacheEntry cacheEntry);
+
+ LockEntry getLockEntry();
+
+ void setLockEntry(LockEntry lockEntry);
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/mnode/basic/CachedBasicMNode.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/mnode/basic/CachedBasicMNode.java
index 31dc86e04da..a2542da64b8 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/mnode/basic/CachedBasicMNode.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/mnode/basic/CachedBasicMNode.java
@@ -28,6 +28,7 @@ import
org.apache.iotdb.commons.schema.node.role.IMeasurementMNode;
import org.apache.iotdb.commons.schema.node.utils.IMNodeContainer;
import org.apache.iotdb.commons.schema.node.visitor.MNodeVisitor;
import
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.cache.CacheEntry;
+import
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.lock.LockEntry;
import
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.mnode.ICachedMNode;
import
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.mnode.container.CachedMNodeContainer;
import
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.mnode.info.CacheMNodeInfo;
@@ -224,6 +225,16 @@ public class CachedBasicMNode implements ICachedMNode {
cacheMNodeInfo.setCacheEntry(cacheEntry);
}
+ @Override
+ public LockEntry getLockEntry() {
+ return cacheMNodeInfo.getLockEntry();
+ }
+
+ @Override
+ public void setLockEntry(LockEntry lockEntry) {
+ cacheMNodeInfo.setLock(lockEntry);
+ }
+
/**
* The basic memory occupied by any CacheBasicMNode object
*
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/mnode/container/CachedMNodeContainer.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/mnode/container/CachedMNodeContainer.java
index 17c1edd8094..b9c9a3ac70e 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/mnode/container/CachedMNodeContainer.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/mnode/container/CachedMNodeContainer.java
@@ -95,7 +95,11 @@ public class CachedMNodeContainer implements
ICachedMNodeContainer {
}
@Override
- public ICachedMNode get(Object key) {
+ public synchronized ICachedMNode get(Object key) {
+ return internalGet(key);
+ }
+
+ private ICachedMNode internalGet(Object key) {
ICachedMNode result = get(childCache, key);
if (result != null) {
return result;
@@ -124,7 +128,7 @@ public class CachedMNodeContainer implements
ICachedMNodeContainer {
@Override
public synchronized ICachedMNode putIfAbsent(String key, ICachedMNode value)
{
- ICachedMNode node = get(key);
+ ICachedMNode node = internalGet(key);
if (node == null) {
if (newChildBuffer == null) {
newChildBuffer = new ConcurrentHashMap<>();
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/mnode/impl/CachedAboveDatabaseMNode.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/mnode/impl/CachedAboveDatabaseMNode.java
index 39fa2a32970..f26ebb27309 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/mnode/impl/CachedAboveDatabaseMNode.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/mnode/impl/CachedAboveDatabaseMNode.java
@@ -20,6 +20,7 @@ package
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.mnode.im
import org.apache.iotdb.commons.schema.node.common.AbstractAboveDatabaseMNode;
import
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.cache.CacheEntry;
+import
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.lock.LockEntry;
import
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.mnode.ICachedMNode;
import
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.mnode.basic.CachedBasicMNode;
@@ -40,6 +41,16 @@ public class CachedAboveDatabaseMNode
basicMNode.setCacheEntry(cacheEntry);
}
+ @Override
+ public LockEntry getLockEntry() {
+ return basicMNode.getLockEntry();
+ }
+
+ @Override
+ public void setLockEntry(LockEntry lockEntry) {
+ basicMNode.setLockEntry(lockEntry);
+ }
+
@Override
public ICachedMNode getAsMNode() {
return this;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/mnode/impl/CachedDatabaseMNode.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/mnode/impl/CachedDatabaseMNode.java
index 18cc9e3ba7f..294562c6af2 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/mnode/impl/CachedDatabaseMNode.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/mnode/impl/CachedDatabaseMNode.java
@@ -23,6 +23,7 @@ import org.apache.iotdb.commons.schema.node.info.IDeviceInfo;
import org.apache.iotdb.commons.schema.node.role.IInternalMNode;
import
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.mem.mnode.info.DatabaseInfo;
import
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.cache.CacheEntry;
+import
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.lock.LockEntry;
import
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.mnode.ICachedMNode;
public class CachedDatabaseMNode
@@ -49,6 +50,16 @@ public class CachedDatabaseMNode
basicMNode.setCacheEntry(cacheEntry);
}
+ @Override
+ public LockEntry getLockEntry() {
+ return basicMNode.getLockEntry();
+ }
+
+ @Override
+ public void setLockEntry(LockEntry lockEntry) {
+ basicMNode.setLockEntry(lockEntry);
+ }
+
@Override
public ICachedMNode getAsMNode() {
return this;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/mnode/impl/CachedMeasurementMNode.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/mnode/impl/CachedMeasurementMNode.java
index 17340525cea..abae1277913 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/mnode/impl/CachedMeasurementMNode.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/mnode/impl/CachedMeasurementMNode.java
@@ -23,6 +23,7 @@ import org.apache.iotdb.commons.schema.node.role.IDeviceMNode;
import org.apache.iotdb.commons.schema.node.utils.IMNodeContainer;
import
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.mem.mnode.info.MeasurementInfo;
import
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.cache.CacheEntry;
+import
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.lock.LockEntry;
import
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.mnode.ICachedMNode;
import
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.mnode.basic.CachedBasicMNode;
import
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.mnode.container.CachedMNodeContainer;
@@ -48,6 +49,16 @@ public class CachedMeasurementMNode extends
AbstractMeasurementMNode<ICachedMNod
basicMNode.setCacheEntry(cacheEntry);
}
+ @Override
+ public LockEntry getLockEntry() {
+ return basicMNode.getLockEntry();
+ }
+
+ @Override
+ public void setLockEntry(LockEntry lockEntry) {
+ basicMNode.setLockEntry(lockEntry);
+ }
+
@Override
public ICachedMNode getAsMNode() {
return this;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/mnode/info/CacheMNodeInfo.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/mnode/info/CacheMNodeInfo.java
index 656b4f94a95..249f9110e52 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/mnode/info/CacheMNodeInfo.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/mnode/info/CacheMNodeInfo.java
@@ -20,10 +20,13 @@ package
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.mnode.in
import
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.mem.mnode.info.BasicMNodeInfo;
import
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.cache.CacheEntry;
+import
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.lock.LockEntry;
public class CacheMNodeInfo extends BasicMNodeInfo {
- private CacheEntry cacheEntry;
+ private volatile CacheEntry cacheEntry;
+
+ private volatile LockEntry lockEntry;
public CacheMNodeInfo(String name) {
super(name);
@@ -37,6 +40,14 @@ public class CacheMNodeInfo extends BasicMNodeInfo {
this.cacheEntry = cacheEntry;
}
+ public LockEntry getLockEntry() {
+ return lockEntry;
+ }
+
+ public void setLock(LockEntry lockEntry) {
+ this.lockEntry = lockEntry;
+ }
+
@Override
public int estimateSize() {
// Estimated size of CacheEntry = 40
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/MockSchemaFile.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/MockSchemaFile.java
index e9594be9017..6dd9491df39 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/MockSchemaFile.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/MockSchemaFile.java
@@ -64,13 +64,14 @@ public class MockSchemaFile implements ISchemaFile {
}
@Override
- public boolean updateDatabaseNode(IDatabaseMNode<ICachedMNode> sgNode)
throws IOException {
+ public synchronized boolean updateDatabaseNode(IDatabaseMNode<ICachedMNode>
sgNode)
+ throws IOException {
this.storageGroupMNode =
cloneMNode(sgNode.getAsMNode()).getAsDatabaseMNode();
return true;
}
@Override
- public ICachedMNode getChildNode(ICachedMNode parent, String childName) {
+ public synchronized ICachedMNode getChildNode(ICachedMNode parent, String
childName) {
Map<String, ICachedMNode> segment = getSegment(parent);
ICachedMNode result = null;
if (segment != null) {
@@ -88,7 +89,7 @@ public class MockSchemaFile implements ISchemaFile {
}
@Override
- public Iterator<ICachedMNode> getChildren(ICachedMNode parent) {
+ public synchronized Iterator<ICachedMNode> getChildren(ICachedMNode parent) {
Map<String, ICachedMNode> segment = getSegment(parent);
if (segment == null) {
@@ -98,12 +99,12 @@ public class MockSchemaFile implements ISchemaFile {
}
@Override
- public boolean createSnapshot(File snapshotDir) {
+ public synchronized boolean createSnapshot(File snapshotDir) {
return false;
}
@Override
- public void writeMNode(ICachedMNode parent) {
+ public synchronized void writeMNode(ICachedMNode parent) {
ICachedMNodeContainer container = getCachedMNodeContainer(parent);
long address = container.getSegmentAddress();
if (container.isVolatile()) {
@@ -127,7 +128,7 @@ public class MockSchemaFile implements ISchemaFile {
}
@Override
- public void delete(ICachedMNode targetNode) {
+ public synchronized void delete(ICachedMNode targetNode) {
ICachedMNode removedNode =
getSegment(targetNode.getParent()).remove(targetNode.getName());
if (removedNode == null || removedNode.isMeasurement()) {
return;
@@ -152,7 +153,7 @@ public class MockSchemaFile implements ISchemaFile {
public void close() {}
@Override
- public void clear() {
+ public synchronized void clear() {
mockFile.clear();
}
@@ -185,6 +186,7 @@ public class MockSchemaFile implements ISchemaFile {
measurementMNode.getAlias())
.getAsMNode();
result.getAsMeasurementMNode().setOffset(measurementMNode.getOffset());
+
result.getAsMeasurementMNode().setPreDeleted(measurementMNode.isPreDeleted());
return result;
} else if (node.isDatabase() && node.isDevice()) {
ICachedMNode result =
@@ -196,6 +198,10 @@ public class MockSchemaFile implements ISchemaFile {
} else if (node.isDevice()) {
ICachedMNode result = nodeFactory.createDeviceMNode(null,
node.getName()).getAsMNode();
result.getAsDeviceMNode().setAligned(node.getAsDeviceMNode().isAlignedNullable());
+ result
+ .getAsDeviceMNode()
+
.setSchemaTemplateId(node.getAsDeviceMNode().getSchemaTemplateIdWithState());
+
result.getAsDeviceMNode().setUseTemplate(node.getAsDeviceMNode().isUseTemplate());
cloneInternalMNodeData(node, result);
return result;
} else if (node.isDatabase()) {
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/metadata/mtree/lock/StampedWriterPreferredLockTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/metadata/mtree/lock/StampedWriterPreferredLockTest.java
index b005c62605c..2c9fdbdfd3d 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/metadata/mtree/lock/StampedWriterPreferredLockTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/metadata/mtree/lock/StampedWriterPreferredLockTest.java
@@ -18,7 +18,7 @@
*/
package org.apache.iotdb.db.metadata.mtree.lock;
-import
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.StampedWriterPreferredLock;
+import
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.lock.StampedWriterPreferredLock;
import org.awaitility.Awaitility;
import org.awaitility.core.ConditionTimeoutException;
@@ -63,7 +63,7 @@ public class StampedWriterPreferredLockTest {
semaphore.acquire();
lock.writeLock();
counter.incrementAndGet();
- lock.unlockWrite();
+ lock.writeUnlock();
} catch (InterruptedException e) {
Assert.fail(e.getMessage());
}
@@ -93,7 +93,7 @@ public class StampedWriterPreferredLockTest {
semaphore.acquire();
lock.writeLock();
counter.incrementAndGet();
- lock.unlockWrite();
+ lock.writeUnlock();
} catch (InterruptedException e) {
Assert.fail(e.getMessage());
}
@@ -128,7 +128,7 @@ public class StampedWriterPreferredLockTest {
semaphore.acquire();
lock.writeLock();
counter.incrementAndGet();
- lock.unlockWrite();
+ lock.writeUnlock();
} catch (InterruptedException e) {
Assert.fail(e.getMessage());
}
@@ -156,7 +156,7 @@ public class StampedWriterPreferredLockTest {
.start();
// block reader util writer release write lock
Assert.assertEquals(0, counter.get());
- lock.unlockWrite();
+ lock.writeUnlock();
Awaitility.await().atMost(2, TimeUnit.SECONDS).until(() -> counter.get()
== 1);
Assert.assertEquals(1, counter.get());
}
@@ -172,7 +172,7 @@ public class StampedWriterPreferredLockTest {
// writer thread will be blocked util main thread release read
lock.
lock.writeLock();
counter.incrementAndGet();
- lock.unlockWrite();
+ lock.writeUnlock();
})
.start();
try {
@@ -234,7 +234,7 @@ public class StampedWriterPreferredLockTest {
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
- lock.unlockWrite();
+ lock.writeUnlock();
lock.writeLock();
counter1.incrementAndGet();
try {
@@ -243,7 +243,7 @@ public class StampedWriterPreferredLockTest {
throw new RuntimeException(e);
}
counter2.incrementAndGet();
- lock.unlockWrite();
+ lock.writeUnlock();
})
.start();
new Thread(
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/metadata/schemaRegion/SchemaStatisticsTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/metadata/schemaRegion/SchemaStatisticsTest.java
index cfa62dd4916..bef220b7c3a 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/metadata/schemaRegion/SchemaStatisticsTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/metadata/schemaRegion/SchemaStatisticsTest.java
@@ -40,7 +40,6 @@ import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
import org.junit.Assert;
-import org.junit.Ignore;
import org.junit.Test;
import java.util.Arrays;
@@ -56,17 +55,15 @@ public class SchemaStatisticsTest extends
AbstractSchemaRegionTest {
}
@Test
- @Ignore
public void testPBTreeMemoryStatistics() throws Exception {
ISchemaRegion schemaRegion1 = getSchemaRegion("root.sg1", 0);
- ISchemaRegion schemaRegion2 = getSchemaRegion("root.sg2", 0);
ISchemaEngineStatistics engineStatistics =
SchemaEngine.getInstance().getSchemaEngineStatistics();
SchemaRegionTestUtil.createSimpleTimeseriesByList(
schemaRegion1, Arrays.asList("root.sg1.n.s0", "root.sg1.n.v.d1.s1",
"root.sg1.n.v.d2.s2"));
SchemaRegionTestUtil.createSimpleTimeseriesByList(
- schemaRegion2, Arrays.asList("root.sg2.d0.s0"));
+ schemaRegion1, Arrays.asList("root.sg1.d0.s0"));
PathPatternTree patternTree = new PathPatternTree();
patternTree.appendPathPattern(new PartialPath("root.**.s1"));
patternTree.appendPathPattern(new PartialPath("root.**.s2"));