This is an automated email from the ASF dual-hosted git repository.
zyk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new bc8d866f4e8 PBTree: Implement dual-buffer container for MNode
management (#12048)
bc8d866f4e8 is described below
commit bc8d866f4e8290485fe97c267d7aacc9f339260e
Author: linxt20 <[email protected]>
AuthorDate: Fri Feb 23 18:48:25 2024 +0800
PBTree: Implement dual-buffer container for MNode management (#12048)
---
.../impl/pbtree/flush/PBTreeFlushExecutor.java | 9 +
.../mtree/impl/pbtree/memory/MemoryManager.java | 37 ++-
.../mnode/container/CachedMNodeContainer.java | 148 ++++-----
.../mnode/container/ICachedMNodeContainer.java | 18 +-
.../pbtree/mnode/container/IMNodeChildBuffer.java | 45 +++
.../pbtree/mnode/container/MNodeChildBuffer.java | 331 +++++++++++++++++++++
.../mnode/container/MNodeNewChildBuffer.java | 75 +++++
.../mnode/container/MNodeUpdateChildBuffer.java | 83 ++++++
.../impl/pbtree/schemafile/MockSchemaFile.java | 6 +-
.../pbtree/schemafile/pagemgr/PageManager.java | 9 +-
.../metadata/container/MNodeChildBufferTest.java | 125 ++++++++
.../mtree/schemafile/SchemaFileLogTest.java | 4 +
.../metadata/mtree/schemafile/SchemaFileTest.java | 129 ++++----
.../iotdb/db/tools/PBTreeFileSketchTest.java | 5 +
14 files changed, 857 insertions(+), 167 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/flush/PBTreeFlushExecutor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/flush/PBTreeFlushExecutor.java
index 40bf66320ef..858e1912618 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/flush/PBTreeFlushExecutor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/flush/PBTreeFlushExecutor.java
@@ -24,6 +24,7 @@ import
org.apache.iotdb.commons.schema.node.role.IDatabaseMNode;
import
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.lock.LockManager;
import
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.memory.IMemoryManager;
import
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.mnode.ICachedMNode;
+import
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.mnode.container.ICachedMNodeContainer;
import
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.schemafile.ISchemaFile;
import org.slf4j.Logger;
@@ -38,6 +39,8 @@ import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
+import static
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.mnode.container.ICachedMNodeContainer.getCachedMNodeContainer;
+
public class PBTreeFlushExecutor {
private static final Logger logger =
LoggerFactory.getLogger(PBTreeFlushExecutor.class);
@@ -126,7 +129,10 @@ public class PBTreeFlushExecutor {
Iterator<ICachedMNode> volatileSubtreeIterator;
List<ICachedMNode> collectedVolatileSubtrees;
try {
+ ICachedMNodeContainer container = getCachedMNodeContainer(subtreeRoot);
+ container.transferAllBufferReceivingToFlushing();
file.writeMNode(subtreeRoot);
+
flushNodeNum.incrementAndGet();
flushMemSize.addAndGet(subtreeRoot.estimateSize());
volatileSubtreeIterator =
@@ -160,7 +166,10 @@ public class PBTreeFlushExecutor {
subtreeRoot = subtreeIterator.next();
try {
+ ICachedMNodeContainer container = getCachedMNodeContainer(subtreeRoot);
+ container.transferAllBufferReceivingToFlushing();
file.writeMNode(subtreeRoot);
+
flushNodeNum.incrementAndGet();
flushMemSize.addAndGet(subtreeRoot.estimateSize());
collectedVolatileSubtrees = new ArrayList<>();
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/memory/MemoryManager.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/memory/MemoryManager.java
index 22b3f139a20..f0b793864db 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/memory/MemoryManager.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/memory/MemoryManager.java
@@ -292,13 +292,14 @@ public class MemoryManager implements IMemoryManager {
private class VolatileSubtreeIterator implements Iterator<ICachedMNode> {
private final ICachedMNodeContainer container;
- private final Iterator<ICachedMNode> bufferedNodeIterator;
-
+ private Iterator<ICachedMNode> bufferedNodeIterator;
+ private byte status;
private ICachedMNode nextSubtree = null;
private VolatileSubtreeIterator(ICachedMNodeContainer container) {
this.container = container;
- this.bufferedNodeIterator = container.getChildrenBufferIterator();
+ this.bufferedNodeIterator =
container.getNewChildFlushingBuffer().values().iterator();
+ this.status = 0;
}
@Override
@@ -322,6 +323,12 @@ public class MemoryManager implements IMemoryManager {
private void tryGetNext() {
ICachedMNode node;
CacheEntry cacheEntry;
+ if (!bufferedNodeIterator.hasNext() && status == 0) {
+ // flushingBuffer of NewChildBuffer has been traversed, and the
flushingBuffer of
+ // UpdateChildBuffer needs to be traversed.
+ bufferedNodeIterator =
container.getUpdatedChildFlushingBuffer().values().iterator();
+ status = 1;
+ }
while (bufferedNodeIterator.hasNext()) {
node = bufferedNodeIterator.next();
@@ -338,9 +345,28 @@ public class MemoryManager implements IMemoryManager {
cacheEntry = getCacheEntry(node);
synchronized (cacheEntry) {
+ if (status == 1
+ &&
container.getUpdatedChildReceivingBuffer().containsKey(node.getName())) {
+ if (cacheEntry.hasVolatileDescendant()
+ && getCachedMNodeContainer(node).hasChildrenInBuffer()) {
+ // these two factor judgement is not redundant because the
#hasVolatileDescendant is
+ // on a higher priority than #container.hasChildren
+
+ // nodes with volatile children should be treated as root of
volatile subtree and
+ // return for flush
+ nextSubtree = node;
+ unlockImmediately = false;
+ }
+ return;
+ }
+
cacheEntry.setVolatile(false);
memoryStatistics.removeVolatileNode();
- container.moveMNodeToCache(node.getName());
+ if (status == 1) {
+ container.moveMNodeFromUpdateChildBufferToCache(node.getName());
+ } else {
+ container.moveMNodeFromNewChildBufferToCache(node.getName());
+ }
if (cacheEntry.hasVolatileDescendant()
&& getCachedMNodeContainer(node).hasChildrenInBuffer()) {
@@ -348,8 +374,7 @@ public class MemoryManager implements IMemoryManager {
// on a higher priority than #container.hasChildren
// nodes with volatile children should be treated as root of
volatile subtree and
- // return
- // for flush
+ // return for flush
nextSubtree = node;
unlockImmediately = false;
return;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/mnode/container/CachedMNodeContainer.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/mnode/container/CachedMNodeContainer.java
index b9c9a3ac70e..20987ac41e5 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/mnode/container/CachedMNodeContainer.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/mnode/container/CachedMNodeContainer.java
@@ -38,14 +38,15 @@ import java.util.TreeSet;
import java.util.concurrent.ConcurrentHashMap;
import static java.util.Collections.emptySet;
+import static
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.mnode.container.MNodeChildBuffer.emptyMNodeChildBuffer;
public class CachedMNodeContainer implements ICachedMNodeContainer {
private long segmentAddress = -1;
private Map<String, ICachedMNode> childCache = null;
- private Map<String, ICachedMNode> newChildBuffer = null;
- private Map<String, ICachedMNode> updatedChildBuffer = null;
+ private MNodeNewChildBuffer newChildBuffer = null;
+ private MNodeUpdateChildBuffer updatedChildBuffer = null;
private static final IMNodeContainer<ICachedMNode> EMPTY_CONTAINER =
new CachedMNodeContainer.EmptyContainer();
@@ -119,7 +120,7 @@ public class CachedMNodeContainer implements
ICachedMNodeContainer {
@Override
public synchronized ICachedMNode put(String key, ICachedMNode value) {
if (newChildBuffer == null) {
- newChildBuffer = new ConcurrentHashMap<>();
+ newChildBuffer = new MNodeNewChildBuffer();
}
return newChildBuffer.put(key, value);
}
@@ -127,15 +128,13 @@ public class CachedMNodeContainer implements
ICachedMNodeContainer {
@Nullable
@Override
public synchronized ICachedMNode putIfAbsent(String key, ICachedMNode value)
{
-
ICachedMNode node = internalGet(key);
if (node == null) {
if (newChildBuffer == null) {
- newChildBuffer = new ConcurrentHashMap<>();
+ newChildBuffer = new MNodeNewChildBuffer();
}
node = newChildBuffer.put(key, value);
}
-
return node;
}
@@ -158,7 +157,7 @@ public class CachedMNodeContainer implements
ICachedMNodeContainer {
@Override
public synchronized void putAll(@Nonnull Map<? extends String, ? extends
ICachedMNode> m) {
if (newChildBuffer == null) {
- newChildBuffer = new ConcurrentHashMap<>();
+ newChildBuffer = new MNodeNewChildBuffer();
}
newChildBuffer.putAll(m);
}
@@ -215,18 +214,7 @@ public class CachedMNodeContainer implements
ICachedMNodeContainer {
@Nullable
@Override
public synchronized ICachedMNode replace(String key, ICachedMNode value) {
- ICachedMNode replacedOne = replace(childCache, key, value);
- if (replacedOne == null) {
- replacedOne = replace(newChildBuffer, key, value);
- }
- if (replacedOne == null) {
- replacedOne = replace(updatedChildBuffer, key, value);
- }
- return replacedOne;
- }
-
- private ICachedMNode replace(Map<String, ICachedMNode> map, String key,
ICachedMNode value) {
- return map == null ? null : map.replace(key, value);
+ throw new UnsupportedOperationException();
}
@Override
@@ -244,19 +232,6 @@ public class CachedMNodeContainer implements
ICachedMNodeContainer {
return segmentAddress == -1;
}
- @Override
- public boolean isFull() {
- return true;
- }
-
- @Override
- public boolean isExpelled() {
- return !isVolatile()
- && isEmpty(childCache)
- && isEmpty(newChildBuffer)
- && isEmpty(updatedChildBuffer);
- }
-
@Override
public boolean hasChildInNewChildBuffer(String name) {
return containsKey(newChildBuffer, name);
@@ -274,32 +249,48 @@ public class CachedMNodeContainer implements
ICachedMNodeContainer {
@Override
public Iterator<ICachedMNode> getChildrenIterator() {
- return new CachedMNodeContainerIterator();
+ return new CachedMNodeContainerIterator((byte) 0);
}
@Override
public Iterator<ICachedMNode> getChildrenBufferIterator() {
- return new BufferIterator();
+ return new CachedMNodeContainerIterator((byte) 1);
}
@Override
- public Iterator<ICachedMNode> getNewChildBufferIterator() {
- return getNewChildBuffer().values().iterator();
+ public Map<String, ICachedMNode> getChildCache() {
+ return childCache == null ? Collections.emptyMap() : childCache;
}
@Override
- public Map<String, ICachedMNode> getChildCache() {
- return childCache == null ? Collections.emptyMap() : childCache;
+ public IMNodeChildBuffer getNewChildBuffer() {
+ return newChildBuffer == null ? emptyMNodeChildBuffer() : newChildBuffer;
}
@Override
- public Map<String, ICachedMNode> getNewChildBuffer() {
- return newChildBuffer == null ? Collections.emptyMap() : newChildBuffer;
+ public IMNodeChildBuffer getUpdatedChildBuffer() {
+ return updatedChildBuffer == null ? emptyMNodeChildBuffer() :
updatedChildBuffer;
}
@Override
- public Map<String, ICachedMNode> getUpdatedChildBuffer() {
- return updatedChildBuffer == null ? Collections.emptyMap() :
updatedChildBuffer;
+ public Map<String, ICachedMNode> getNewChildFlushingBuffer() {
+ return getNewChildBuffer().getFlushingBuffer();
+ }
+
+ @Override
+ public Map<String, ICachedMNode> getUpdatedChildFlushingBuffer() {
+ return getUpdatedChildBuffer().getFlushingBuffer();
+ }
+
+ @Override
+ public Map<String, ICachedMNode> getUpdatedChildReceivingBuffer() {
+ return getUpdatedChildBuffer().getReceivingBuffer();
+ }
+
+ @Override
+ public void transferAllBufferReceivingToFlushing() {
+ getNewChildBuffer().transferReceivingBufferToFlushingBuffer();
+ getUpdatedChildBuffer().transferReceivingBufferToFlushingBuffer();
}
@Override
@@ -325,7 +316,7 @@ public class CachedMNodeContainer implements
ICachedMNodeContainer {
@Override
public synchronized void appendMNode(ICachedMNode node) {
if (newChildBuffer == null) {
- newChildBuffer = new ConcurrentHashMap<>();
+ newChildBuffer = new MNodeNewChildBuffer();
}
newChildBuffer.put(node.getName(), node);
}
@@ -335,18 +326,24 @@ public class CachedMNodeContainer implements
ICachedMNodeContainer {
ICachedMNode node = removeFromMap(childCache, name);
if (node != null) {
if (updatedChildBuffer == null) {
- updatedChildBuffer = new ConcurrentHashMap<>();
+ updatedChildBuffer = new MNodeUpdateChildBuffer();
}
updatedChildBuffer.put(name, node);
}
}
@Override
- public synchronized void moveMNodeToCache(String name) {
- ICachedMNode node = removeFromMap(newChildBuffer, name);
- if (node == null) {
- node = removeFromMap(updatedChildBuffer, name);
+ public synchronized void moveMNodeFromNewChildBufferToCache(String name) {
+ ICachedMNode node = getNewChildBuffer().removeFromFlushingBuffer(name);
+ if (childCache == null) {
+ childCache = new ConcurrentHashMap<>();
}
+ childCache.put(name, node);
+ }
+
+ @Override
+ public synchronized void moveMNodeFromUpdateChildBufferToCache(String name) {
+ ICachedMNode node = getUpdatedChildBuffer().removeFromFlushingBuffer(name);
if (childCache == null) {
childCache = new ConcurrentHashMap<>();
}
@@ -383,10 +380,11 @@ public class CachedMNodeContainer implements
ICachedMNodeContainer {
private class CachedMNodeContainerIterator implements Iterator<ICachedMNode>
{
Iterator<ICachedMNode> iterator;
- byte status = 0;
+ byte status;
- CachedMNodeContainerIterator() {
- iterator = getChildCache().values().iterator();
+ CachedMNodeContainerIterator(byte status) {
+ this.status = status;
+ changeStatus();
}
@Override
@@ -410,59 +408,23 @@ public class CachedMNodeContainer implements
ICachedMNodeContainer {
private boolean changeStatus() {
switch (status) {
case 0:
- iterator = getNewChildBuffer().values().iterator();
+ iterator = getChildCache().values().iterator();
status = 1;
return true;
case 1:
- iterator = getUpdatedChildBuffer().values().iterator();
+ iterator = getNewChildBuffer().getMNodeChildBufferIterator();
status = 2;
return true;
+ case 2:
+ iterator = getUpdatedChildBuffer().getMNodeChildBufferIterator();
+ status = 3;
+ return true;
default:
return false;
}
}
}
- private class BufferIterator implements Iterator<ICachedMNode> {
- Iterator<ICachedMNode> iterator;
- Iterator<ICachedMNode> newBufferIterator;
- Iterator<ICachedMNode> updateBufferIterator;
- byte status = 0;
-
- BufferIterator() {
- newBufferIterator = getNewChildBuffer().values().iterator();
- updateBufferIterator = getUpdatedChildBuffer().values().iterator();
- iterator = newBufferIterator;
- }
-
- @Override
- public boolean hasNext() {
- if (iterator.hasNext()) {
- return true;
- }
- while (!iterator.hasNext()) {
- if (!changeStatus()) {
- return false;
- }
- }
- return true;
- }
-
- @Override
- public ICachedMNode next() {
- return iterator.next();
- }
-
- private boolean changeStatus() {
- if (status == 0) {
- iterator = updateBufferIterator;
- status = 1;
- return true;
- }
- return false;
- }
- }
-
private static class EmptyContainer extends AbstractMap<String, ICachedMNode>
implements IMNodeContainer<ICachedMNode> {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/mnode/container/ICachedMNodeContainer.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/mnode/container/ICachedMNodeContainer.java
index b28bc06ce4e..4029aa8ef4e 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/mnode/container/ICachedMNodeContainer.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/mnode/container/ICachedMNodeContainer.java
@@ -32,10 +32,6 @@ public interface ICachedMNodeContainer extends
IMNodeContainer<ICachedMNode> {
boolean isVolatile();
- boolean isFull();
-
- boolean isExpelled();
-
boolean hasChildInNewChildBuffer(String name);
boolean hasChildInBuffer(String name);
@@ -46,14 +42,20 @@ public interface ICachedMNodeContainer extends
IMNodeContainer<ICachedMNode> {
Iterator<ICachedMNode> getChildrenBufferIterator();
- Iterator<ICachedMNode> getNewChildBufferIterator();
-
Map<String, ICachedMNode> getChildCache();
Map<String, ICachedMNode> getNewChildBuffer();
Map<String, ICachedMNode> getUpdatedChildBuffer();
+ Map<String, ICachedMNode> getNewChildFlushingBuffer();
+
+ Map<String, ICachedMNode> getUpdatedChildFlushingBuffer();
+
+ Map<String, ICachedMNode> getUpdatedChildReceivingBuffer();
+
+ void transferAllBufferReceivingToFlushing();
+
void loadChildrenFromDisk(Map<String, ICachedMNode> children);
void addChildToCache(ICachedMNode node);
@@ -62,7 +64,9 @@ public interface ICachedMNodeContainer extends
IMNodeContainer<ICachedMNode> {
void updateMNode(String name);
- void moveMNodeToCache(String name);
+ void moveMNodeFromNewChildBufferToCache(String name);
+
+ void moveMNodeFromUpdateChildBufferToCache(String name);
void evictMNode(String name);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/mnode/container/IMNodeChildBuffer.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/mnode/container/IMNodeChildBuffer.java
new file mode 100644
index 00000000000..ee46149fc89
--- /dev/null
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/mnode/container/IMNodeChildBuffer.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.mnode.container;
+
+import org.apache.iotdb.commons.schema.node.utils.IMNodeContainer;
+import
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.mnode.ICachedMNode;
+
+import java.util.Iterator;
+import java.util.Map;
+
+public interface IMNodeChildBuffer extends IMNodeContainer<ICachedMNode> {
+
+ // get the iterator of the whole Buffer which is make a merge sort of the
ReceivingBuffer and the
+ // FlushingBuffer
+ Iterator<ICachedMNode> getMNodeChildBufferIterator();
+
+ // only get the FlushingBuffer, there shall not be write operation on the
returned map instance.
+ Map<String, ICachedMNode> getFlushingBuffer();
+
+ // only get the ReceivingBuffer, there shall not be write operation on the
returned map instance.
+ Map<String, ICachedMNode> getReceivingBuffer();
+
+ // Before flushing, use this to transfer ReceivingBuffer to FlushingBuffer
+ void transferReceivingBufferToFlushingBuffer();
+
+ // After flushing, use this to clear the flushed node
+ ICachedMNode removeFromFlushingBuffer(Object key);
+}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/mnode/container/MNodeChildBuffer.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/mnode/container/MNodeChildBuffer.java
new file mode 100644
index 00000000000..838310f558e
--- /dev/null
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/mnode/container/MNodeChildBuffer.java
@@ -0,0 +1,331 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.mnode.container;
+
+import
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.mnode.ICachedMNode;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.util.AbstractMap;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.concurrent.ConcurrentHashMap;
+
+import static java.util.Collections.emptySet;
+
+public abstract class MNodeChildBuffer implements IMNodeChildBuffer {
+
+ protected Map<String, ICachedMNode> flushingBuffer; // Store old data nodes
for disk flushing
+ protected Map<String, ICachedMNode> receivingBuffer; // Store newly created
or modified data nodes
+
+ protected int totalSize =
+ 0; // The total size is the union size of flushingBuffer and
receivingBuffer.
+
+ private static final IMNodeChildBuffer EMPTY_BUFFER = new
MNodeChildBuffer.EmptyBuffer();
+
+ public static IMNodeChildBuffer emptyMNodeChildBuffer() {
+ return EMPTY_BUFFER;
+ }
+
+ @Override
+ public Iterator<ICachedMNode> getMNodeChildBufferIterator() {
+ return new MNodeChildBufferIterator();
+ }
+
+ @Override
+ public Map<String, ICachedMNode> getFlushingBuffer() {
+ return flushingBuffer == null ? Collections.emptyMap() : flushingBuffer;
+ }
+
+ @Override
+ public Map<String, ICachedMNode> getReceivingBuffer() {
+ return receivingBuffer == null ? Collections.emptyMap() : receivingBuffer;
+ }
+
+ @Override
+ public void transferReceivingBufferToFlushingBuffer() {
+ if (flushingBuffer == null) {
+ flushingBuffer = new ConcurrentHashMap<>();
+ }
+ if (receivingBuffer != null) {
+ flushingBuffer.putAll(receivingBuffer);
+ receivingBuffer.clear();
+ }
+ }
+
+ @Override
+ public int size() {
+ return totalSize;
+ }
+
+ @Override
+ public boolean isEmpty() {
+ return totalSize == 0;
+ }
+
+ private boolean containKey(Map<String, ICachedMNode> map, Object key) {
+ return map != null && map.containsKey(key);
+ }
+
+ @Override
+ public boolean containsKey(Object key) {
+ return containKey(flushingBuffer, key) || containKey(receivingBuffer, key);
+ }
+
+ private boolean containValue(Map<String, ICachedMNode> map, Object value) {
+ return map != null && map.containsValue(value);
+ }
+
+ @Override
+ public boolean containsValue(Object value) {
+ return containValue(flushingBuffer, value) ||
containValue(receivingBuffer, value);
+ }
+
+ protected ICachedMNode get(Map<String, ICachedMNode> map, Object key) {
+ return map != null ? map.get(key) : null;
+ }
+
+ @Override
+ public synchronized ICachedMNode get(Object key) {
+ ICachedMNode result = get(receivingBuffer, key);
+ if (result != null) {
+ return result;
+ } else {
+ return get(flushingBuffer, key);
+ }
+ }
+
+ private ICachedMNode remove(Map<String, ICachedMNode> map, Object key) {
+ return map == null ? null : map.remove(key);
+ }
+
+ @Override
+ public synchronized ICachedMNode remove(Object key) {
+ // There are some duplicate keys in recevingBuffer and flushingBuffer.
+ ICachedMNode result1 = remove(flushingBuffer, key);
+ ICachedMNode result2 = remove(receivingBuffer, key);
+ // If the first one is empty, then look at the second result; if the first
one is not empty,
+ // then the second one is either empty or the same as the first one
+ ICachedMNode result = result1 != null ? result1 : result2;
+ if (result != null) {
+ totalSize--;
+ }
+ return result;
+ }
+
+ @Override
+ public void clear() {
+ if (receivingBuffer != null) {
+ receivingBuffer.clear();
+ }
+ if (flushingBuffer != null) {
+ flushingBuffer.clear();
+ }
+ totalSize = 0;
+ }
+
+ private Set<String> keySet(Map<String, ICachedMNode> map) {
+ return map == null ? Collections.emptySet() : map.keySet();
+ }
+
+ @Nonnull
+ @Override
+ public Set<String> keySet() {
+ Set<String> result = new TreeSet<>();
+ // This is a set structure. If there are duplicates, set will
automatically remove them.
+ result.addAll(keySet(receivingBuffer));
+ result.addAll(keySet(flushingBuffer));
+ return result;
+ }
+
+ private Collection<ICachedMNode> values(Map<String, ICachedMNode> map) {
+ return map == null ? Collections.emptyList() : map.values();
+ }
+
+ @Nonnull
+ @Override
+ public Collection<ICachedMNode> values() {
+ Collection<ICachedMNode> result = new HashSet<>();
+ result.addAll(values(flushingBuffer));
+ result.addAll(values(receivingBuffer));
+ return result;
+ }
+
+ private Set<Entry<String, ICachedMNode>> entrySet(Map<String, ICachedMNode>
map) {
+ return map == null ? Collections.emptySet() : map.entrySet();
+ }
+
+ @Nonnull
+ @Override
+ public Set<Entry<String, ICachedMNode>> entrySet() {
+ Set<Entry<String, ICachedMNode>> result = new HashSet<>();
+ // HashSet will automatically remove duplicates
+ result.addAll(entrySet(receivingBuffer));
+ result.addAll(entrySet(flushingBuffer));
+ return result;
+ }
+
+ @Nullable
+ @Override
+ public synchronized ICachedMNode replace(String key, ICachedMNode value) {
+ throw new UnsupportedOperationException();
+ }
+
+ private class MNodeChildBufferIterator implements Iterator<ICachedMNode> {
+
+ int flushingIndex = 0;
+ int receivingIndex = 0;
+ List<ICachedMNode> flushingBufferList;
+ List<ICachedMNode> receivingBufferList;
+
+ MNodeChildBufferIterator() {
+ // use merge sort to merge them and remove duplicates.
+ List<ICachedMNode> list = new ArrayList<>();
+ receivingBufferList = new ArrayList<>(getReceivingBuffer().values());
+ flushingBufferList = new ArrayList<>(getFlushingBuffer().values());
+ receivingBufferList.sort(Comparator.comparing(ICachedMNode::getName));
+ flushingBufferList.sort(Comparator.comparing(ICachedMNode::getName));
+ }
+
+ private ICachedMNode tryGetNext() {
+ // There are only three situations here, namely, both are left, and each
of the two is left,
+ // and then gradually merge and remove duplicates.
+ if (receivingIndex < receivingBufferList.size()
+ && flushingIndex < flushingBufferList.size()) {
+ ICachedMNode node1 = receivingBufferList.get(receivingIndex);
+ ICachedMNode node2 = flushingBufferList.get(flushingIndex);
+ if (node1.getName().compareTo(node2.getName()) < 0) {
+ receivingIndex++;
+ return node1;
+ } else if (node1.getName().compareTo(node2.getName()) > 0) {
+ flushingIndex++;
+ return node2;
+ } else {
+ receivingIndex++;
+ flushingIndex++;
+ return node1;
+ }
+ }
+ if (receivingIndex < receivingBufferList.size()) {
+ return receivingBufferList.get(receivingIndex++);
+ }
+ if (flushingIndex < flushingBufferList.size()) {
+ return flushingBufferList.get(flushingIndex++);
+ }
+ throw new NoSuchElementException();
+ }
+
+ @Override
+ public boolean hasNext() {
+ return flushingIndex < flushingBufferList.size()
+ || receivingIndex < receivingBufferList.size();
+ }
+
+ @Override
+ public ICachedMNode next() {
+ if (!hasNext()) {
+ throw new NoSuchElementException();
+ }
+ return tryGetNext();
+ }
+ }
+
+ private static class EmptyBuffer extends AbstractMap<String, ICachedMNode>
+ implements IMNodeChildBuffer {
+
+ @Override
+ public Iterator<ICachedMNode> getMNodeChildBufferIterator() {
+ return Collections.emptyIterator();
+ }
+
+ @Override
+ public Map<String, ICachedMNode> getFlushingBuffer() {
+ return Collections.emptyMap();
+ }
+
+ @Override
+ public Map<String, ICachedMNode> getReceivingBuffer() {
+ return Collections.emptyMap();
+ }
+
+ @Override
+ public void transferReceivingBufferToFlushingBuffer() {
+ // Do nothing
+ }
+
+ @Override
+ public ICachedMNode removeFromFlushingBuffer(Object key) {
+ return null;
+ }
+
+ @Nonnull
+ @Override
+ public Set<Entry<String, ICachedMNode>> entrySet() {
+ return emptySet();
+ }
+
+ @Override
+ public int size() {
+ return 0;
+ }
+
+ @Override
+ public boolean isEmpty() {
+ return true;
+ }
+
+ @Override
+ public boolean containsKey(Object key) {
+ return false;
+ }
+
+ @Override
+ public boolean containsValue(Object value) {
+ return false;
+ }
+
+ @Override
+ public ICachedMNode get(Object key) {
+ return null;
+ }
+
+ @Nonnull
+ @Override
+ public Set<String> keySet() {
+ return emptySet();
+ }
+
+ @Nonnull
+ @Override
+ public Collection<ICachedMNode> values() {
+ return emptySet();
+ }
+ }
+}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/mnode/container/MNodeNewChildBuffer.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/mnode/container/MNodeNewChildBuffer.java
new file mode 100644
index 00000000000..739d3c9fcae
--- /dev/null
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/mnode/container/MNodeNewChildBuffer.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.mnode.container;
+
+import
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.mnode.ICachedMNode;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+public class MNodeNewChildBuffer extends MNodeChildBuffer {
+ @Nullable
+ @Override
+ public synchronized ICachedMNode put(String key, ICachedMNode value) {
+ if (receivingBuffer == null) {
+ receivingBuffer = new ConcurrentHashMap<>();
+ }
+ totalSize++;
+ return receivingBuffer.put(key, value);
+ }
+
+ @Nullable
+ @Override
+ public synchronized ICachedMNode putIfAbsent(String key, ICachedMNode value)
{
+ ICachedMNode result = get(receivingBuffer, key);
+ if (result == null) {
+ if (receivingBuffer == null) {
+ receivingBuffer = new ConcurrentHashMap<>();
+ }
+ totalSize++;
+ result = receivingBuffer.put(key, value);
+ }
+ return result;
+ }
+
+ @Override
+ public synchronized void putAll(@Nonnull Map<? extends String, ? extends
ICachedMNode> m) {
+ if (receivingBuffer == null) {
+ receivingBuffer = new ConcurrentHashMap<>();
+ }
+ totalSize += m.size();
+ receivingBuffer.putAll(m);
+ }
+
+ @Override
+ public synchronized ICachedMNode removeFromFlushingBuffer(Object key) {
+ if (flushingBuffer == null) {
+ return null;
+ }
+ ICachedMNode result = flushingBuffer.remove(key);
+ if (result != null) {
+ totalSize--;
+ }
+ return result;
+ }
+}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/mnode/container/MNodeUpdateChildBuffer.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/mnode/container/MNodeUpdateChildBuffer.java
new file mode 100644
index 00000000000..994fb724a79
--- /dev/null
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/mnode/container/MNodeUpdateChildBuffer.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.mnode.container;
+
+import
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.mnode.ICachedMNode;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+public class MNodeUpdateChildBuffer extends MNodeChildBuffer {
+ @Nullable
+ @Override
+ public synchronized ICachedMNode put(String key, ICachedMNode value) {
+ if (receivingBuffer == null) {
+ receivingBuffer = new ConcurrentHashMap<>();
+ }
+ if (flushingBuffer == null || !flushingBuffer.containsKey(key)) {
+ totalSize++;
+ }
+ return receivingBuffer.put(key, value);
+ }
+
+ @Nullable
+ @Override
+ public synchronized ICachedMNode putIfAbsent(String key, ICachedMNode value)
{
+ ICachedMNode result = get(receivingBuffer, key);
+ if (result == null) {
+ if (receivingBuffer == null) {
+ receivingBuffer = new ConcurrentHashMap<>();
+ }
+ if (flushingBuffer == null || !flushingBuffer.containsKey(key)) {
+ totalSize++;
+ }
+ result = receivingBuffer.put(key, value);
+ }
+ return result;
+ }
+
+ @Override
+ public synchronized void putAll(@Nonnull Map<? extends String, ? extends
ICachedMNode> m) {
+ if (receivingBuffer == null) {
+ receivingBuffer = new ConcurrentHashMap<>();
+ }
+ for (Entry<? extends String, ? extends ICachedMNode> entry : m.entrySet())
{
+ if (flushingBuffer == null ||
!flushingBuffer.containsKey(entry.getKey())) {
+ totalSize++;
+ }
+ receivingBuffer.put(entry.getKey(), entry.getValue());
+ }
+ }
+
+ @Override
+ public synchronized ICachedMNode removeFromFlushingBuffer(Object key) {
+ if (flushingBuffer == null) {
+ return null;
+ }
+ ICachedMNode result = flushingBuffer.remove(key);
+ if (result != null && (receivingBuffer == null ||
!receivingBuffer.containsKey(key))) {
+ totalSize--;
+ }
+ return result;
+ }
+}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/MockSchemaFile.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/MockSchemaFile.java
index 6dd9491df39..7b71eb49770 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/MockSchemaFile.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/MockSchemaFile.java
@@ -59,6 +59,8 @@ public class MockSchemaFile implements ISchemaFile {
null,
storageGroupPath.getTailNode(),
CommonDescriptor.getInstance().getConfig().getDefaultTTLInMs());
+ ICachedMNodeContainer container =
getCachedMNodeContainer(storageGroupMNode.getAsMNode());
+ container.transferAllBufferReceivingToFlushing();
writeMNode(storageGroupMNode.getAsMNode());
return cloneMNode(storageGroupMNode.getAsMNode());
}
@@ -111,8 +113,8 @@ public class MockSchemaFile implements ISchemaFile {
address = allocateSegment();
container.setSegmentAddress(address);
}
- write(address, container.getUpdatedChildBuffer());
- write(address, container.getNewChildBuffer());
+ write(address, container.getUpdatedChildFlushingBuffer());
+ write(address, container.getNewChildFlushingBuffer());
}
private void write(long address, Map<String, ICachedMNode> nodeMap) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/pagemgr/PageManager.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/pagemgr/PageManager.java
index 95a47533ca8..7e0b6923673 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/pagemgr/PageManager.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/pagemgr/PageManager.java
@@ -179,7 +179,8 @@ public abstract class PageManager implements IPageManager {
String alias;
// TODO: reserve order of insert in container may be better
for (Map.Entry<String, ICachedMNode> entry :
-
ICachedMNodeContainer.getCachedMNodeContainer(node).getNewChildBuffer().entrySet().stream()
+
ICachedMNodeContainer.getCachedMNodeContainer(node).getNewChildFlushingBuffer().entrySet()
+ .stream()
.sorted(Map.Entry.comparingByKey())
.collect(Collectors.toList())) {
// check and pre-allocate
@@ -248,7 +249,7 @@ public abstract class PageManager implements IPageManager {
reEstimateSegSize(
curPage.getAsSegmentedPage().getSegmentSize(actSegId) +
childBuffer.capacity(),
ICachedMNodeContainer.getCachedMNodeContainer(node)
- .getNewChildBuffer()
+ .getNewChildFlushingBuffer()
.entrySet()
.size());
ISegmentedPage newPage = getMinApplSegmentedPageInMem(newSegSize,
cxt);
@@ -283,7 +284,9 @@ public abstract class PageManager implements IPageManager {
ByteBuffer childBuffer;
long res; // result of update
for (Map.Entry<String, ICachedMNode> entry :
-
ICachedMNodeContainer.getCachedMNodeContainer(node).getUpdatedChildBuffer().entrySet())
{
+ ICachedMNodeContainer.getCachedMNodeContainer(node)
+ .getUpdatedChildFlushingBuffer()
+ .entrySet()) {
child = entry.getValue();
actualAddress = getTargetSegmentAddress(curSegAddr, entry.getKey(), cxt);
childBuffer = RecordUtils.node2Buffer(child);
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/metadata/container/MNodeChildBufferTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/metadata/container/MNodeChildBufferTest.java
new file mode 100644
index 00000000000..568493c55be
--- /dev/null
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/metadata/container/MNodeChildBufferTest.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.metadata.container;
+
+import org.apache.iotdb.commons.schema.node.utils.IMNodeFactory;
+import
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.mnode.ICachedMNode;
+import
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.mnode.container.MNodeChildBuffer;
+import
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.mnode.container.MNodeUpdateChildBuffer;
+import
org.apache.iotdb.db.schemaengine.schemaregion.mtree.loader.MNodeFactoryLoader;
+
+import org.junit.Test;
+
+import java.util.Iterator;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class MNodeChildBufferTest {
+
+ private final IMNodeFactory<ICachedMNode> nodeFactory =
+ MNodeFactoryLoader.getInstance().getCachedMNodeIMNodeFactory();
+
+ @Test
+ public void testMNodeChildBuffer() {
+ ICachedMNode rootNode = nodeFactory.createInternalMNode(null, "root");
+
+ ICachedMNode speedNode =
+ rootNode
+ .addChild(nodeFactory.createInternalMNode(null, "sg1"))
+ .addChild(nodeFactory.createInternalMNode(null, "device"))
+ .addChild(nodeFactory.createInternalMNode(null, "speed"));
+ assertEquals("root.sg1.device.speed", speedNode.getFullPath());
+
+ ICachedMNode temperatureNode =
+ rootNode
+ .getChild("sg1")
+ .addChild(nodeFactory.createInternalMNode(null, "device11"))
+ .addChild(nodeFactory.createInternalMNode(null, "temperature"));
+ assertEquals("root.sg1.device11.temperature",
temperatureNode.getFullPath());
+
+ MNodeChildBuffer buffer = new MNodeUpdateChildBuffer();
+ assertTrue(buffer.getReceivingBuffer().isEmpty());
+ assertTrue(buffer.getFlushingBuffer().isEmpty());
+ assertTrue(buffer.isEmpty());
+
+ buffer.put("root.sg1.device.speed", speedNode);
+ assertEquals(1, buffer.getReceivingBuffer().size());
+ assertEquals(0, buffer.getFlushingBuffer().size());
+ assertEquals(1, buffer.size());
+
+ buffer.transferReceivingBufferToFlushingBuffer();
+ assertEquals(0, buffer.getReceivingBuffer().size());
+ assertEquals(1, buffer.getFlushingBuffer().size());
+ assertEquals(1, buffer.size());
+
+ buffer.put("root.sg1.device.speed", speedNode);
+ assertEquals(1, buffer.getReceivingBuffer().size());
+ assertEquals(1, buffer.getFlushingBuffer().size());
+ assertEquals(1, buffer.size());
+
+ buffer.put("root.sg1.device11.temperature", temperatureNode);
+ // check containskey and containsValue
+ assertTrue(buffer.containsKey("root.sg1.device.speed"));
+ assertTrue(buffer.containsKey("root.sg1.device11.temperature"));
+ assertTrue(buffer.containsValue(speedNode));
+ assertTrue(buffer.containsValue(temperatureNode));
+ // check keyset and values, entryset
+ assertEquals(2, buffer.keySet().size());
+ assertEquals(2, buffer.values().size());
+ assertEquals(2, buffer.entrySet().size());
+ // check iterator and foreach
+ // get iterator
+ Iterator<ICachedMNode> iterator = buffer.getMNodeChildBufferIterator();
+ // check iterator
+ assertTrue(iterator.hasNext());
+ assertEquals(speedNode, iterator.next());
+ assertTrue(iterator.hasNext());
+ assertEquals(temperatureNode, iterator.next());
+ assertFalse(iterator.hasNext());
+
+ // check get
+ assertEquals(speedNode, buffer.get("root.sg1.device.speed"));
+ assertEquals(temperatureNode, buffer.get("root.sg1.device11.temperature"));
+
+ // check remove
+ buffer.remove("root.sg1.device.speed");
+ assertEquals(1, buffer.getReceivingBuffer().size());
+ assertEquals(0, buffer.getFlushingBuffer().size());
+ assertEquals(1, buffer.size());
+
+ // check removeFromFlushingBuffer
+ buffer.transferReceivingBufferToFlushingBuffer();
+ buffer.putIfAbsent("root.sg1.device11.temperature", temperatureNode);
+ buffer.removeFromFlushingBuffer("root.sg1.device11.temperature");
+ assertEquals(1, buffer.getReceivingBuffer().size());
+ assertEquals(0, buffer.getFlushingBuffer().size());
+ assertEquals(1, buffer.size());
+
+ // check clear
+ buffer.transferReceivingBufferToFlushingBuffer();
+ buffer.putIfAbsent("root.sg1.device11.temperature", temperatureNode);
+ buffer.clear();
+ assertTrue(buffer.getReceivingBuffer().isEmpty());
+ assertTrue(buffer.getFlushingBuffer().isEmpty());
+ assertTrue(buffer.isEmpty());
+ }
+}
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/metadata/mtree/schemafile/SchemaFileLogTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/metadata/mtree/schemafile/SchemaFileLogTest.java
index 83f72b8e5f3..382cacd8de8 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/metadata/mtree/schemafile/SchemaFileLogTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/metadata/mtree/schemafile/SchemaFileLogTest.java
@@ -27,6 +27,7 @@ import org.apache.iotdb.consensus.ConsensusFactory;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.schemaengine.SchemaEngineMode;
import
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.mnode.ICachedMNode;
+import
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.mnode.container.ICachedMNodeContainer;
import
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.schemafile.ISchemaPage;
import
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.schemafile.SchemaFile;
import
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.schemafile.SchemaFileConfig;
@@ -49,6 +50,7 @@ import java.util.Iterator;
import static
org.apache.iotdb.db.metadata.mtree.schemafile.SchemaFileTest.getSegAddrInContainer;
import static
org.apache.iotdb.db.metadata.mtree.schemafile.SchemaFileTest.getTreeBFT;
import static
org.apache.iotdb.db.metadata.mtree.schemafile.SchemaFileTest.virtualTriangleMTree;
+import static
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.mnode.container.ICachedMNodeContainer.getCachedMNodeContainer;
import static org.junit.Assert.fail;
public class SchemaFileLogTest {
@@ -95,6 +97,8 @@ public class SchemaFileLogTest {
while (ite.hasNext()) {
ICachedMNode curNode = ite.next();
if (!curNode.isMeasurement()) {
+ ICachedMNodeContainer container = getCachedMNodeContainer(curNode);
+ container.transferAllBufferReceivingToFlushing();
sf.writeMNode(curNode);
lastNode = curNode;
}
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/metadata/mtree/schemafile/SchemaFileTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/metadata/mtree/schemafile/SchemaFileTest.java
index c28029ee431..c0bcecb2aa5 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/metadata/mtree/schemafile/SchemaFileTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/metadata/mtree/schemafile/SchemaFileTest.java
@@ -59,6 +59,8 @@ import java.util.Set;
import java.util.function.BiFunction;
import java.util.stream.Collectors;
+import static
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.mnode.container.ICachedMNodeContainer.getCachedMNodeContainer;
+
public class SchemaFileTest {
private static final int TEST_SCHEMA_REGION_ID = 0;
@@ -81,6 +83,13 @@ public class SchemaFileTest {
.setSchemaEngineMode(SchemaEngineMode.Memory.toString());
}
+ private void writeMNodeInTest(ISchemaFile st, ICachedMNode node)
+ throws IOException, MetadataException {
+ ICachedMNodeContainer container = getCachedMNodeContainer(node);
+ container.transferAllBufferReceivingToFlushing();
+ st.writeMNode(node);
+ }
+
@Test
public void essentialTestSchemaFile() throws IOException, MetadataException {
ISchemaFile sf = SchemaFile.initSchemaFile("root.test.vRoot1",
TEST_SCHEMA_REGION_ID);
@@ -101,21 +110,21 @@ public class SchemaFileTest {
while (ite.hasNext()) {
ICachedMNode curNode = ite.next();
if (!curNode.isMeasurement()) {
- sf.writeMNode(curNode);
+ writeMNodeInTest(sf, curNode);
}
}
ICachedMNodeContainer.getCachedMNodeContainer(int0).getNewChildBuffer().clear();
addNodeToUpdateBuffer(int0, getMeasurementNode(int0, "mint1",
"alas99999"));
- sf.writeMNode(int0);
+ writeMNodeInTest(sf, int0);
Assert.assertEquals(
"alas99999", sf.getChildNode(int0,
"mint1").getAsMeasurementMNode().getAlias());
ICachedMNodeContainer.getCachedMNodeContainer(int1).getNewChildBuffer().clear();
ICachedMNodeContainer.getCachedMNodeContainer(int1)
.appendMNode(getMeasurementNode(int1, "int1newM", "alas"));
- sf.writeMNode(int1);
+ writeMNodeInTest(sf, int1);
Assert.assertEquals(
"alas", sf.getChildNode(int1,
"int1newM").getAsMeasurementMNode().getAlias());
@@ -123,12 +132,12 @@ public class SchemaFileTest {
ICachedMNodeContainer.getCachedMNodeContainer(int4)
.getNewChildBuffer()
.put("AAAAA", getMeasurementNode(int4, "AAAAA", "alas"));
- sf.writeMNode(int4);
+ writeMNodeInTest(sf, int4);
Assert.assertEquals("alas", sf.getChildNode(int4,
"AAAAA").getAsMeasurementMNode().getAlias());
ICachedMNodeContainer.getCachedMNodeContainer(int4).getUpdatedChildBuffer().clear();
addNodeToUpdateBuffer(int4, getMeasurementNode(int4, "AAAAA", "BBBBBB"));
- sf.writeMNode(int4);
+ writeMNodeInTest(sf, int4);
Assert.assertEquals(
"BBBBBB", sf.getChildNode(int4,
"AAAAA").getAsMeasurementMNode().getAlias());
@@ -136,7 +145,7 @@ public class SchemaFileTest {
ICachedMNodeContainer.getCachedMNodeContainer(int4)
.getUpdatedChildBuffer()
.put("finalM191", getMeasurementNode(int4, "finalM191",
"ALLLLLLLLLLLLLLLLLLLLfinalM191"));
- sf.writeMNode(int4);
+ writeMNodeInTest(sf, int4);
Assert.assertEquals(
"ALLLLLLLLLLLLLLLLLLLLfinalM191",
sf.getChildNode(int4, "finalM191").getAsMeasurementMNode().getAlias());
@@ -166,14 +175,14 @@ public class SchemaFileTest {
ICachedMNode root = getVerticalTree(100, "VT");
Iterator<ICachedMNode> ite = getTreeBFT(root);
while (ite.hasNext()) {
- sf.writeMNode(ite.next());
+ writeMNodeInTest(sf, ite.next());
}
ICachedMNode vt1 = getNode(root, "root.VT_0.VT_1");
ICachedMNode vt4 = getNode(root, "root.VT_0.VT_1.VT_2.VT_3.VT_4");
ICachedMNodeContainer.getCachedMNodeContainer(vt1).getNewChildBuffer().clear();
addMeasurementChild(vt1, "newM");
- sf.writeMNode(vt1);
+ writeMNodeInTest(sf, vt1);
ICachedMNode vt0 = getNode(root, "root.VT_0");
Assert.assertEquals(
@@ -199,8 +208,8 @@ public class SchemaFileTest {
newNodes.add("r1_" + i);
newNodes.add("r4_" + i);
}
- nsf.writeMNode(vt1);
- nsf.writeMNode(vt4);
+ writeMNodeInTest(nsf, vt1);
+ writeMNodeInTest(nsf, vt4);
nsf.close();
nsf = SchemaFile.loadSchemaFile("root.sgvt.vt", TEST_SCHEMA_REGION_ID);
@@ -226,8 +235,8 @@ public class SchemaFileTest {
newNodes.add("2r1_" + i);
newNodes.add("2r4_" + i);
}
- nsf.writeMNode(vt1);
- nsf.writeMNode(vt4);
+ writeMNodeInTest(nsf, vt1);
+ writeMNodeInTest(nsf, vt4);
Assert.assertEquals(11111L, nsf.init().getAsDatabaseMNode().getDataTTL());
nsf.close();
@@ -241,7 +250,7 @@ public class SchemaFileTest {
while (ite.hasNext()) {
ICachedMNode cur = ite.next();
if (!cur.isMeasurement()) {
- sf.writeMNode(cur);
+ writeMNodeInTest(sf, cur);
}
}
@@ -286,7 +295,7 @@ public class SchemaFileTest {
j--;
}
- sf.writeMNode(dbNode);
+ writeMNodeInTest(sf, dbNode);
ICachedMNode meas;
ICachedMNode dev = dbNode.getChildren().get("dev_2");
@@ -296,7 +305,7 @@ public class SchemaFileTest {
i--;
}
- sf.writeMNode(dev);
+ writeMNodeInTest(sf, dev);
Assert.assertEquals(
"ma_1994", sf.getChildNode(dev,
"m_1994").getAsMeasurementMNode().getAlias());
@@ -315,7 +324,7 @@ public class SchemaFileTest {
// verify operation with massive segment under quadratic complexity
try {
- sf.writeMNode(dbNode);
+ writeMNodeInTest(sf, dbNode);
} finally {
sf.close();
}
@@ -324,7 +333,7 @@ public class SchemaFileTest {
fillChildren(dbNode2, 5000, "MEN", this::supplyEntity);
ISchemaFile sf2 = SchemaFile.initSchemaFile(dbNode2.getName(),
TEST_SCHEMA_REGION_ID);
try {
- sf2.writeMNode(dbNode2);
+ writeMNodeInTest(sf2, dbNode2);
} finally {
sf2.close();
}
@@ -370,7 +379,7 @@ public class SchemaFileTest {
while (orderedTree.hasNext()) {
node = orderedTree.next();
if (!node.isMeasurement()) {
- sf.writeMNode(node);
+ writeMNodeInTest(sf, node);
}
}
@@ -390,7 +399,7 @@ public class SchemaFileTest {
while (orderedTree.hasNext()) {
node = orderedTree.next();
if (!node.isMeasurement()) {
- sf.writeMNode(node);
+ writeMNodeInTest(sf, node);
}
}
} catch (Exception e) {
@@ -415,7 +424,7 @@ public class SchemaFileTest {
while (orderedTree.hasNext()) {
node = orderedTree.next();
if (!node.isMeasurement() && !node.isDatabase()) {
- sf.writeMNode(node);
+ writeMNodeInTest(sf, node);
ICachedMNodeContainer.getCachedMNodeContainer(node).getNewChildBuffer().clear();
}
}
@@ -447,7 +456,7 @@ public class SchemaFileTest {
while (orderedTree.hasNext()) {
node = orderedTree.next();
if (!node.isMeasurement() && !node.isDatabase()) {
- sf.writeMNode(node);
+ writeMNodeInTest(sf, node);
if (Math.random() > 0.5) {
arbitraryNode.add(node);
}
@@ -489,17 +498,18 @@ public class SchemaFileTest {
while (ite.hasNext()) {
ICachedMNode cur = ite.next();
if (!cur.isMeasurement()) {
- sf.writeMNode(cur);
+ writeMNodeInTest(sf, cur);
}
}
root.getChildren().clear();
root.addChild(getMeasurementNode(root, "aa0", "updatedupdatednode"));
+
ICachedMNodeContainer.getCachedMNodeContainer(root).transferAllBufferReceivingToFlushing();
+
ICachedMNodeContainer.getCachedMNodeContainer(root).moveMNodeFromNewChildBufferToCache("aa0");
-
ICachedMNodeContainer.getCachedMNodeContainer(root).moveMNodeToCache("aa0");
ICachedMNodeContainer.getCachedMNodeContainer(root).updateMNode("aa0");
- sf.writeMNode(root);
+ writeMNodeInTest(sf, root);
Assert.assertEquals(
"updatedupdatednode", sf.getChildNode(root,
"aa0").getAsMeasurementMNode().getAlias());
Assert.assertEquals("aa0", sf.getChildNode(root,
"updatedupdatednode").getName());
@@ -511,19 +521,20 @@ public class SchemaFileTest {
ICachedMNode ent1 = root.getChild("ent1");
ent1.addChild(getMeasurementNode(ent1, "m1", "m1a"));
- sf.writeMNode(root);
- sf.writeMNode(ent1);
+ writeMNodeInTest(sf, root);
+ writeMNodeInTest(sf, ent1);
ent1.getChildren().clear();
ent1.addChild(getMeasurementNode(ent1, "m1", "m1aaaaaa"));
- ICachedMNodeContainer.getCachedMNodeContainer(ent1).moveMNodeToCache("m1");
+
ICachedMNodeContainer.getCachedMNodeContainer(ent1).transferAllBufferReceivingToFlushing();
+
ICachedMNodeContainer.getCachedMNodeContainer(ent1).moveMNodeFromNewChildBufferToCache("m1");
ICachedMNodeContainer.getCachedMNodeContainer(ent1).updateMNode("m1");
Assert.assertEquals(
63, getSegment(sf, getSegAddr(sf, getSegAddrInContainer(ent1),
"m1")).size());
- sf.writeMNode(ent1);
+ writeMNodeInTest(sf, ent1);
Assert.assertEquals(
1020, getSegment(sf, getSegAddr(sf, getSegAddrInContainer(ent1),
"m1")).size());
@@ -534,16 +545,16 @@ public class SchemaFileTest {
addMeasurementChild(ent1, "nc" + ent1.getChildren().size());
}
- sf.writeMNode(ent1);
+ writeMNodeInTest(sf, ent1);
ent1.getChildren().clear();
ent1.addChild(getMeasurementNode(ent1, "nc0",
"updated_nc0updated_nc0updated_nc0updated_nc0"));
moveToUpdateBuffer(ent1, "nc0");
- sf.writeMNode(ent1);
+ writeMNodeInTest(sf, ent1);
ent1.getChildren().clear();
ent1.addChild(getMeasurementNode(ent1, "nc1",
"updated_nc1updated_nc1updated_nc1updated_nc1"));
moveToUpdateBuffer(ent1, "nc1");
- sf.writeMNode(ent1);
+ writeMNodeInTest(sf, ent1);
Assert.assertEquals(
getSegAddr(sf, getSegAddrInContainer(ent1), "nc1"),
@@ -563,10 +574,10 @@ public class SchemaFileTest {
ICachedMNode d1 = fillChildren(sgNode, 300, "d", this::supplyEntity);
ISchemaFile sf = SchemaFile.initSchemaFile("root.sg",
TEST_SCHEMA_REGION_ID);
try {
- sf.writeMNode(sgNode);
+ writeMNodeInTest(sf, sgNode);
fillChildren(d1, 46, "s", this::supplyMeasurement);
- sf.writeMNode(d1);
+ writeMNodeInTest(sf, d1);
moveAllToBuffer(d1);
moveAllToBuffer(sgNode);
@@ -575,9 +586,9 @@ public class SchemaFileTest {
// size
// measured by insertion batch and existed size at same time.
fillChildren(sgNode, 350, "sd", this::supplyEntity);
- sf.writeMNode(sgNode);
+ writeMNodeInTest(sf, sgNode);
fillChildren(d1, 20, "ss", this::supplyMeasurement);
- sf.writeMNode(d1);
+ writeMNodeInTest(sf, d1);
Iterator<ICachedMNode> verifyChildren = sf.getChildren(d1);
int cnt = 0;
@@ -626,7 +637,7 @@ public class SchemaFileTest {
while (ite.hasNext()) {
curNode = ite.next();
if (!curNode.isMeasurement()) {
- sf.writeMNode(curNode);
+ writeMNodeInTest(sf, curNode);
}
}
} finally {
@@ -645,7 +656,7 @@ public class SchemaFileTest {
moveToUpdateBuffer(dev2, name);
}
- sf.writeMNode(dev2);
+ writeMNodeInTest(sf, dev2);
sf.sync();
sf.close();
@@ -686,8 +697,9 @@ public class SchemaFileTest {
getMeasurementNode(
ent4, "e4m" + ent4.getChildren().size(), "e4malais" +
ent4.getChildren().size()));
}
- sf.writeMNode(root);
- sf.writeMNode(ent4);
+
+ writeMNodeInTest(sf, root);
+ writeMNodeInTest(sf, ent4);
ent4.getChildren().clear();
ent4.addChild(
@@ -696,21 +708,21 @@ public class SchemaFileTest {
"e4m0",
"updated_updated_updated_updated_updated_updated_updated_updated_updated_updated_updated_updated_updated_updated_updated_updated_updated_updated_updated_updated_updated_updated_updated_updated_updated_updated_updated_updated_updated_updated_updated_updated_updated_updated_updated_updated_"));
moveToUpdateBuffer(ent4, "e4m0");
- sf.writeMNode(ent4);
+ writeMNodeInTest(sf, ent4);
while (ent2.getChildren().size() < 19) {
ent2.addChild(
getMeasurementNode(
ent2, "e2m" + ent2.getChildren().size(), "e2malais" +
ent2.getChildren().size()));
}
- sf.writeMNode(ent2);
+ writeMNodeInTest(sf, ent2);
while (ent3.getChildren().size() < 180) {
ent3.addChild(
getMeasurementNode(
ent3, "e3m" + ent3.getChildren().size(), "e3malais" +
ent3.getChildren().size()));
}
- sf.writeMNode(ent3);
+ writeMNodeInTest(sf, ent3);
ent2.getChildren().clear();
while (ent2.getChildren().size() < 70) {
@@ -718,7 +730,7 @@ public class SchemaFileTest {
getMeasurementNode(
ent2, "e2ms" + ent2.getChildren().size(), "e2is_s2_" +
ent2.getChildren().size()));
}
- sf.writeMNode(ent2);
+ writeMNodeInTest(sf, ent2);
Assert.assertEquals(
getSegAddr(sf, getSegAddrInContainer(ent2), "e2m0") + 65536,
@@ -738,8 +750,9 @@ public class SchemaFileTest {
"e5malaikkkkks" + ent5.getChildren().size()));
}
- sf.writeMNode(root);
- sf.writeMNode(ent5);
+ writeMNodeInTest(sf, root);
+ writeMNodeInTest(sf, ent5);
+
ent5.getChildren().clear();
ent5.addChild(
getMeasurementNode(
@@ -747,7 +760,7 @@ public class SchemaFileTest {
"e5extm",
"e5malaikkkkkse5malaikkkkkse5malaikkkkkse5malaikkkkkse5"
+
"malaikkkkkse5malaikkkkkse5malaikkkkkse5malaikkkkkse5malaikkkkkse5malaikkkkkse5malaikkkkkse5malaikkkkks"));
- sf.writeMNode(ent5);
+ writeMNodeInTest(sf, ent5);
Assert.assertEquals(20, getSegment(sf,
getSegAddrInContainer(ent5)).getAllRecords().size());
Assert.assertEquals(
"e5extm",
@@ -759,7 +772,7 @@ public class SchemaFileTest {
ent5.getChildren().clear();
addNodeToUpdateBuffer(ent5, getMeasurementNode(ent5, "e5extm", null));
- sf.writeMNode(ent5);
+ writeMNodeInTest(sf, ent5);
Assert.assertEquals(null, sf.getChildNode(ent5,
"e5extm").getAsMeasurementMNode().getAlias());
@@ -814,7 +827,7 @@ public class SchemaFileTest {
Iterator<ICachedMNode> orderedTree = getTreeBFT(sgNode);
ISchemaFile sf = SchemaFile.initSchemaFile(sgNode.getName(),
TEST_SCHEMA_REGION_ID);
- sf.writeMNode(sgNode);
+ writeMNodeInTest(sf, sgNode);
Iterator<ICachedMNode> res = sf.getChildren(sgNode);
while (res.hasNext()) {
@@ -847,7 +860,7 @@ public class SchemaFileTest {
}
ISchemaFile sf = SchemaFile.initSchemaFile(sgNode.getName(),
TEST_SCHEMA_REGION_ID);
- sf.writeMNode(sgNode);
+ writeMNodeInTest(sf, sgNode);
Iterator<ICachedMNode> res = sf.getChildren(sgNode);
@@ -888,15 +901,15 @@ public class SchemaFileTest {
d010.addChild(ano);
sgNode.addChild(d010);
- sf.writeMNode(sgNode);
- sf.writeMNode(sgNode.getChildren().get("d_010"));
+ writeMNodeInTest(sf, sgNode);
+ writeMNodeInTest(sf, sgNode.getChildren().get("d_010"));
ano.getAsMeasurementMNode().setAlias("aliaslasialsaialiaslasialsai");
d010.getChildren().clear();
d010.addChild(ano);
moveToUpdateBuffer(d010, "splitover");
- sf.writeMNode(d010);
+ writeMNodeInTest(sf, d010);
int d010cs = 0;
Iterator<ICachedMNode> res2 = sf.getChildren(d010);
@@ -1029,13 +1042,17 @@ public class SchemaFileTest {
static void addNodeToUpdateBuffer(ICachedMNode par, ICachedMNode child) {
ICachedMNodeContainer.getCachedMNodeContainer(par).remove(child.getName());
ICachedMNodeContainer.getCachedMNodeContainer(par).appendMNode(child);
-
ICachedMNodeContainer.getCachedMNodeContainer(par).moveMNodeToCache(child.getName());
+
ICachedMNodeContainer.getCachedMNodeContainer(par).transferAllBufferReceivingToFlushing();
+ ICachedMNodeContainer.getCachedMNodeContainer(par)
+ .moveMNodeFromNewChildBufferToCache(child.getName());
ICachedMNodeContainer.getCachedMNodeContainer(par).updateMNode(child.getName());
}
static void moveToUpdateBuffer(ICachedMNode par, String childName) {
ICachedMNodeContainer.getCachedMNodeContainer(par).appendMNode(par.getChild(childName));
-
ICachedMNodeContainer.getCachedMNodeContainer(par).moveMNodeToCache(childName);
+
ICachedMNodeContainer.getCachedMNodeContainer(par).transferAllBufferReceivingToFlushing();
+ ICachedMNodeContainer.getCachedMNodeContainer(par)
+ .moveMNodeFromNewChildBufferToCache(childName);
ICachedMNodeContainer.getCachedMNodeContainer(par).updateMNode(childName);
}
@@ -1043,7 +1060,7 @@ public class SchemaFileTest {
List<String> childNames =
par.getChildren().values().stream().map(IMNode::getName).collect(Collectors.toList());
for (String name : childNames) {
-
ICachedMNodeContainer.getCachedMNodeContainer(par).moveMNodeToCache(name);
+
ICachedMNodeContainer.getCachedMNodeContainer(par).moveMNodeFromNewChildBufferToCache(name);
ICachedMNodeContainer.getCachedMNodeContainer(par).updateMNode(name);
}
}
@@ -1052,7 +1069,7 @@ public class SchemaFileTest {
List<String> childNames =
par.getChildren().values().stream().map(IMNode::getName).collect(Collectors.toList());
for (String name : childNames) {
-
ICachedMNodeContainer.getCachedMNodeContainer(par).moveMNodeToCache(name);
+
ICachedMNodeContainer.getCachedMNodeContainer(par).moveMNodeFromNewChildBufferToCache(name);
}
}
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/tools/PBTreeFileSketchTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/tools/PBTreeFileSketchTest.java
index 1329edaef28..93110222692 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/tools/PBTreeFileSketchTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/tools/PBTreeFileSketchTest.java
@@ -25,6 +25,7 @@ import org.apache.iotdb.commons.schema.SchemaConstant;
import org.apache.iotdb.commons.schema.node.utils.IMNodeFactory;
import org.apache.iotdb.db.schemaengine.SchemaEngineMode;
import
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.mnode.ICachedMNode;
+import
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.mnode.container.ICachedMNodeContainer;
import
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.schemafile.ISchemaFile;
import
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.schemafile.SchemaFile;
import
org.apache.iotdb.db.schemaengine.schemaregion.mtree.loader.MNodeFactoryLoader;
@@ -49,6 +50,8 @@ import java.util.Iterator;
import java.util.LinkedList;
import java.util.Queue;
+import static
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.mnode.container.ICachedMNodeContainer.getCachedMNodeContainer;
+
public class PBTreeFileSketchTest {
private final IMNodeFactory<ICachedMNode> nodeFactory =
@@ -80,6 +83,8 @@ public class PBTreeFileSketchTest {
while (ite.hasNext()) {
ICachedMNode cur = ite.next();
if (!cur.isMeasurement()) {
+ ICachedMNodeContainer container = getCachedMNodeContainer(cur);
+ container.transferAllBufferReceivingToFlushing();
sf.writeMNode(cur);
}
}