This is an automated email from the ASF dual-hosted git repository.
zyk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new ba866841453 Pbtree: MNode iterating with merge sort upon disk and
buffer (#12077)
ba866841453 is described below
commit ba86684145307c6755214df7506cc1a95cb59df3
Author: linxt20 <[email protected]>
AuthorDate: Wed Feb 28 09:49:46 2024 +0800
Pbtree: MNode iterating with merge sort upon disk and buffer (#12077)
---
.../mtree/impl/pbtree/CachedMTreeStore.java | 141 ++++++++-------------
.../mnode/container/CachedMNodeContainer.java | 25 +++-
.../pbtree/mnode/container/MNodeChildBuffer.java | 67 +++-------
.../iotdb/commons/schema/MergeSortIterator.java | 122 ++++++++++++++++++
4 files changed, 212 insertions(+), 143 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/CachedMTreeStore.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/CachedMTreeStore.java
index 2addd6e908d..e15245d3c89 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/CachedMTreeStore.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/CachedMTreeStore.java
@@ -21,6 +21,7 @@ package
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree;
import org.apache.iotdb.commons.exception.MetadataException;
import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.commons.schema.MergeSortIterator;
import org.apache.iotdb.commons.schema.node.role.IDeviceMNode;
import org.apache.iotdb.commons.schema.node.role.IMeasurementMNode;
import org.apache.iotdb.commons.schema.node.utils.IMNodeFactory;
@@ -49,9 +50,9 @@ import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
+import java.util.Collections;
import java.util.Iterator;
import java.util.Map;
-import java.util.NoSuchElementException;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
@@ -648,14 +649,12 @@ public class CachedMTreeStore implements
IMTreeStore<ICachedMNode> {
*/
private class CachedMNodeIterator implements IMNodeIterator<ICachedMNode> {
ICachedMNode parent;
- Iterator<ICachedMNode> iterator;
+ CachedMNodeMergeIterator mergeIterator;
Iterator<ICachedMNode> bufferIterator;
- boolean isIteratingDisk;
- ICachedMNode nextNode;
+ Iterator<ICachedMNode> diskIterator;
boolean needLock;
boolean isLocked;
-
long readLockStamp;
CachedMNodeIterator(ICachedMNode parent, boolean needLock)
@@ -670,14 +669,9 @@ public class CachedMTreeStore implements
IMTreeStore<ICachedMNode> {
this.parent = parent;
ICachedMNodeContainer container =
ICachedMNodeContainer.getCachedMNodeContainer(parent);
bufferIterator = container.getChildrenBufferIterator();
- if (!container.isVolatile()) {
- this.iterator = file.getChildren(parent);
- isIteratingDisk = true;
- } else {
- iterator = bufferIterator;
- isIteratingDisk = false;
- }
-
+ diskIterator =
+ !container.isVolatile() ? file.getChildren(parent) :
Collections.emptyIterator();
+ mergeIterator = new CachedMNodeMergeIterator(diskIterator,
bufferIterator);
} catch (Throwable e) {
lockManager.stampedReadUnlock(parent, readLockStamp);
if (needLock) {
@@ -690,77 +684,12 @@ public class CachedMTreeStore implements
IMTreeStore<ICachedMNode> {
@Override
public boolean hasNext() {
- if (nextNode != null) {
- return true;
- } else {
- try {
- readNext();
- } catch (MetadataException e) {
- LOGGER.error("Error occurred during readNext, {}", e.getMessage());
- return false;
- }
- return nextNode != null;
- }
+ return mergeIterator.hasNext();
}
- // must invoke hasNext() first
@Override
public ICachedMNode next() {
- if (nextNode == null && !hasNext()) {
- throw new NoSuchElementException();
- }
- ICachedMNode result = nextNode;
- nextNode = null;
- return result;
- }
-
- private void readNext() throws MetadataException {
- ICachedMNode node = null;
- if (isIteratingDisk) {
- ICachedMNodeContainer container =
ICachedMNodeContainer.getCachedMNodeContainer(parent);
- if (iterator.hasNext()) {
- node = iterator.next();
- while (container.hasChildInBuffer(node.getName())) {
- if (iterator.hasNext()) {
- node = iterator.next();
- } else {
- node = null;
- break;
- }
- }
- }
- if (node != null) {
- ICachedMNode nodeInMem = parent.getChild(node.getName());
- if (nodeInMem != null) {
- // this branch means the node load from disk is in cache, thus use
the instance in
- // cache
- try {
- memoryManager.updateCacheStatusAfterMemoryRead(nodeInMem);
- node = nodeInMem;
- } catch (MNodeNotCachedException e) {
- node = loadChildFromDiskToParent(parent, node);
- }
- } else {
- node = loadChildFromDiskToParent(parent, node);
- }
- nextNode = node;
- return;
- } else {
- startIteratingBuffer();
- }
- }
-
- if (iterator.hasNext()) {
- node = iterator.next();
- // node in buffer won't be evicted during Iteration
- memoryManager.updateCacheStatusAfterMemoryRead(node);
- }
- nextNode = node;
- }
-
- private void startIteratingBuffer() {
- iterator = bufferIterator;
- isIteratingDisk = false;
+ return mergeIterator.next();
}
@Override
@@ -770,19 +699,51 @@ public class CachedMTreeStore implements
IMTreeStore<ICachedMNode> {
@Override
public void close() {
- try {
- if (nextNode != null) {
- unPin(nextNode, false);
- nextNode = null;
+ if (isLocked) {
+ lockManager.stampedReadUnlock(parent, readLockStamp);
+ if (needLock) {
+ lockManager.globalReadUnlock();
}
- } finally {
- if (isLocked) {
- lockManager.stampedReadUnlock(parent, readLockStamp);
- if (needLock) {
- lockManager.globalReadUnlock();
+ isLocked = false;
+ }
+ }
+
+ private class CachedMNodeMergeIterator extends
MergeSortIterator<ICachedMNode> {
+ public CachedMNodeMergeIterator(
+ Iterator<ICachedMNode> diskIterator, Iterator<ICachedMNode>
bufferIterator) {
+ super(diskIterator, bufferIterator);
+ }
+
+ protected ICachedMNode onReturnLeft(ICachedMNode ansMNode) {
+ ICachedMNode nodeInMem = parent.getChild(ansMNode.getName());
+ if (nodeInMem != null) {
+ try {
+ memoryManager.updateCacheStatusAfterMemoryRead(nodeInMem);
+ ansMNode = nodeInMem;
+ } catch (MNodeNotCachedException e) {
+ ansMNode = loadChildFromDiskToParent(parent, ansMNode);
}
- isLocked = false;
+ } else {
+ ansMNode = loadChildFromDiskToParent(parent, ansMNode);
+ }
+ return ansMNode;
+ }
+
+ protected ICachedMNode onReturnRight(ICachedMNode ansMNode) {
+ try {
+ memoryManager.updateCacheStatusAfterMemoryRead(ansMNode);
+ } catch (MNodeNotCachedException e) {
+ throw new RuntimeException(e);
}
+ return ansMNode;
+ }
+
+ protected int decide() {
+ return 1;
+ }
+
+ protected int compare(ICachedMNode left, ICachedMNode right) {
+ return left.getName().compareTo(right.getName());
}
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/mnode/container/CachedMNodeContainer.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/mnode/container/CachedMNodeContainer.java
index 20987ac41e5..0e5f272f71b 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/mnode/container/CachedMNodeContainer.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/mnode/container/CachedMNodeContainer.java
@@ -19,6 +19,7 @@
package
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.mnode.container;
+import org.apache.iotdb.commons.schema.MergeSortIterator;
import org.apache.iotdb.commons.schema.node.utils.IMNodeContainer;
import
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.mnode.ICachedMNode;
@@ -254,7 +255,7 @@ public class CachedMNodeContainer implements
ICachedMNodeContainer {
@Override
public Iterator<ICachedMNode> getChildrenBufferIterator() {
- return new CachedMNodeContainerIterator((byte) 1);
+ return new BufferIterator();
}
@Override
@@ -408,11 +409,11 @@ public class CachedMNodeContainer implements
ICachedMNodeContainer {
private boolean changeStatus() {
switch (status) {
case 0:
- iterator = getChildCache().values().iterator();
+ iterator = getNewChildBuffer().getMNodeChildBufferIterator();
status = 1;
return true;
case 1:
- iterator = getNewChildBuffer().getMNodeChildBufferIterator();
+ iterator = getUpdatedChildBuffer().getMNodeChildBufferIterator();
status = 2;
return true;
case 2:
@@ -425,6 +426,24 @@ public class CachedMNodeContainer implements
ICachedMNodeContainer {
}
}
+ private class BufferIterator extends MergeSortIterator<ICachedMNode> {
+
+ BufferIterator() {
+ super(
+ getNewChildBuffer().getMNodeChildBufferIterator(),
+ getUpdatedChildBuffer().getMNodeChildBufferIterator());
+ }
+
+ protected int decide() {
+ throw new IllegalStateException(
+ "There shall not exist two node with the same name separately in
newChildBuffer and updateChildBuffer");
+ }
+
+ protected int compare(ICachedMNode left, ICachedMNode right) {
+ return left.getName().compareTo(right.getName());
+ }
+ }
+
private static class EmptyContainer extends AbstractMap<String, ICachedMNode>
implements IMNodeContainer<ICachedMNode> {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/mnode/container/MNodeChildBuffer.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/mnode/container/MNodeChildBuffer.java
index 838310f558e..cd79d554f69 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/mnode/container/MNodeChildBuffer.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/mnode/container/MNodeChildBuffer.java
@@ -19,6 +19,7 @@
package
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.mnode.container;
+import org.apache.iotdb.commons.schema.MergeSortIterator;
import
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.mnode.ICachedMNode;
import javax.annotation.Nonnull;
@@ -33,7 +34,6 @@ import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
-import java.util.NoSuchElementException;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.ConcurrentHashMap;
@@ -198,62 +198,29 @@ public abstract class MNodeChildBuffer implements
IMNodeChildBuffer {
throw new UnsupportedOperationException();
}
- private class MNodeChildBufferIterator implements Iterator<ICachedMNode> {
+ private Iterator<ICachedMNode> getSortedReceivingBuffer() {
+ List<ICachedMNode> receivingBufferList = new
ArrayList<>(getReceivingBuffer().values());
+ receivingBufferList.sort(Comparator.comparing(ICachedMNode::getName));
+ return receivingBufferList.iterator();
+ }
- int flushingIndex = 0;
- int receivingIndex = 0;
- List<ICachedMNode> flushingBufferList;
- List<ICachedMNode> receivingBufferList;
+ private Iterator<ICachedMNode> getSortedFlushingBuffer() {
+ List<ICachedMNode> flushingBufferList = new
ArrayList<>(getFlushingBuffer().values());
+ flushingBufferList.sort(Comparator.comparing(ICachedMNode::getName));
+ return flushingBufferList.iterator();
+ }
+ private class MNodeChildBufferIterator extends
MergeSortIterator<ICachedMNode> {
MNodeChildBufferIterator() {
- // use merge sort to merge them and remove duplicates.
- List<ICachedMNode> list = new ArrayList<>();
- receivingBufferList = new ArrayList<>(getReceivingBuffer().values());
- flushingBufferList = new ArrayList<>(getFlushingBuffer().values());
- receivingBufferList.sort(Comparator.comparing(ICachedMNode::getName));
- flushingBufferList.sort(Comparator.comparing(ICachedMNode::getName));
- }
-
- private ICachedMNode tryGetNext() {
- // There are only three situations here, namely, both are left, and each
of the two is left,
- // and then gradually merge and remove duplicates.
- if (receivingIndex < receivingBufferList.size()
- && flushingIndex < flushingBufferList.size()) {
- ICachedMNode node1 = receivingBufferList.get(receivingIndex);
- ICachedMNode node2 = flushingBufferList.get(flushingIndex);
- if (node1.getName().compareTo(node2.getName()) < 0) {
- receivingIndex++;
- return node1;
- } else if (node1.getName().compareTo(node2.getName()) > 0) {
- flushingIndex++;
- return node2;
- } else {
- receivingIndex++;
- flushingIndex++;
- return node1;
- }
- }
- if (receivingIndex < receivingBufferList.size()) {
- return receivingBufferList.get(receivingIndex++);
- }
- if (flushingIndex < flushingBufferList.size()) {
- return flushingBufferList.get(flushingIndex++);
- }
- throw new NoSuchElementException();
+ super(getSortedReceivingBuffer(), getSortedFlushingBuffer());
}
- @Override
- public boolean hasNext() {
- return flushingIndex < flushingBufferList.size()
- || receivingIndex < receivingBufferList.size();
+ protected int decide() {
+ return -1;
}
- @Override
- public ICachedMNode next() {
- if (!hasNext()) {
- throw new NoSuchElementException();
- }
- return tryGetNext();
+ protected int compare(ICachedMNode left, ICachedMNode right) {
+ return left.getName().compareTo(right.getName());
}
}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/MergeSortIterator.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/MergeSortIterator.java
new file mode 100644
index 00000000000..a393e89e58a
--- /dev/null
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/MergeSortIterator.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.commons.schema;
+
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+
+public abstract class MergeSortIterator<E> implements Iterator<E> {
+ private Iterator<E> leftIterator; // iterator of the sequence on the left
+ private Iterator<E> rightIterator; // iterator of the sequence on the right
+
+ private E leftHeader; // The first element of the sequence on the left
+ private E rightHeader; // The first element of the sequence on the right
+
+ protected MergeSortIterator(Iterator<E> leftIterator, Iterator<E>
rightIterator) {
+ this.leftIterator = leftIterator;
+ this.rightIterator = rightIterator;
+ leftHeader = leftIterator.hasNext() ? leftIterator.next() : null;
+ rightHeader = rightIterator.hasNext() ? rightIterator.next() : null;
+ }
+
+ // Determine whether there is a next element
+ public boolean hasNext() {
+ return leftHeader != null || rightHeader != null;
+ }
+
+ // Get the next element. If there is no next element, an error will be
reported.
+ public E next() {
+ if (!hasNext()) {
+ throw new NoSuchElementException();
+ }
+ return tryGetNext();
+ }
+
+ // In merge sort, the first element of the left sequence is added to the
sorted sequence while
+ // update the leftHeader.
+ private E catchLeft() {
+ E ans = leftHeader;
+ leftHeader = leftIterator.hasNext() ? leftIterator.next() : null;
+ return ans;
+ }
+
+ // In merge sort, the first element of the right sequence is added to the
sorted sequence while
+ // update the rightHeader.
+ private E catchRight() {
+ E ans = rightHeader;
+ rightHeader = rightIterator.hasNext() ? rightIterator.next() : null;
+ return ans;
+ }
+
+ // When two elements are the same, according to the choice of decide, the
target element is left
+ // and the other element is deleted.
+ private E catchEqual(int decide) {
+ switch (decide) {
+ case -1:
+ rightHeader = rightIterator.hasNext() ? rightIterator.next() : null;
+ return onReturnLeft(catchLeft());
+ case 1:
+ leftHeader = leftIterator.hasNext() ? leftIterator.next() : null;
+ return onReturnRight(catchRight());
+ default:
+ throw new IllegalArgumentException();
+ }
+ }
+
+ // One step in merge sort: compare the first elements of the two sequences
and process them based
+ // on the comparison results
+ E tryGetNext() {
+ if (leftHeader != null && rightHeader != null) {
+ if (compare(leftHeader, rightHeader) == 0) {
+ return catchEqual(decide());
+ } else if (compare(leftHeader, rightHeader) < 0) {
+ return onReturnLeft(catchLeft());
+ } else if (compare(leftHeader, rightHeader) > 0) {
+ return onReturnRight(catchRight());
+ } else {
+ throw new IllegalArgumentException();
+ }
+ } else if (leftHeader != null) {
+ return onReturnLeft(catchLeft());
+ } else if (rightHeader != null) {
+ return onReturnRight(catchRight());
+ } else {
+ throw new NoSuchElementException();
+ }
+ }
+
+ // Post-process the first element of the left sequence
+ protected E onReturnLeft(E left) {
+ return left;
+ }
+
+ // Post-process the first element of the right sequence
+ protected E onReturnRight(E right) {
+ return right;
+ }
+
+ // Decide the target element when two elements are the same
+ protected int decide() {
+ return 0;
+ }
+
+ // Compare two elements
+ protected abstract int compare(E left, E right);
+}