This is an automated email from the ASF dual-hosted git repository.

zyk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new ba866841453 Pbtree: MNode iterating with merge sort upon disk and 
buffer (#12077)
ba866841453 is described below

commit ba86684145307c6755214df7506cc1a95cb59df3
Author: linxt20 <[email protected]>
AuthorDate: Wed Feb 28 09:49:46 2024 +0800

    Pbtree: MNode iterating with merge sort upon disk and buffer (#12077)
---
 .../mtree/impl/pbtree/CachedMTreeStore.java        | 141 ++++++++-------------
 .../mnode/container/CachedMNodeContainer.java      |  25 +++-
 .../pbtree/mnode/container/MNodeChildBuffer.java   |  67 +++-------
 .../iotdb/commons/schema/MergeSortIterator.java    | 122 ++++++++++++++++++
 4 files changed, 212 insertions(+), 143 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/CachedMTreeStore.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/CachedMTreeStore.java
index 2addd6e908d..e15245d3c89 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/CachedMTreeStore.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/CachedMTreeStore.java
@@ -21,6 +21,7 @@ package 
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree;
 
 import org.apache.iotdb.commons.exception.MetadataException;
 import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.commons.schema.MergeSortIterator;
 import org.apache.iotdb.commons.schema.node.role.IDeviceMNode;
 import org.apache.iotdb.commons.schema.node.role.IMeasurementMNode;
 import org.apache.iotdb.commons.schema.node.utils.IMNodeFactory;
@@ -49,9 +50,9 @@ import org.slf4j.LoggerFactory;
 
 import java.io.File;
 import java.io.IOException;
+import java.util.Collections;
 import java.util.Iterator;
 import java.util.Map;
-import java.util.NoSuchElementException;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Consumer;
@@ -648,14 +649,12 @@ public class CachedMTreeStore implements 
IMTreeStore<ICachedMNode> {
    */
   private class CachedMNodeIterator implements IMNodeIterator<ICachedMNode> {
     ICachedMNode parent;
-    Iterator<ICachedMNode> iterator;
+    CachedMNodeMergeIterator mergeIterator;
     Iterator<ICachedMNode> bufferIterator;
-    boolean isIteratingDisk;
-    ICachedMNode nextNode;
+    Iterator<ICachedMNode> diskIterator;
 
     boolean needLock;
     boolean isLocked;
-
     long readLockStamp;
 
     CachedMNodeIterator(ICachedMNode parent, boolean needLock)
@@ -670,14 +669,9 @@ public class CachedMTreeStore implements 
IMTreeStore<ICachedMNode> {
         this.parent = parent;
         ICachedMNodeContainer container = 
ICachedMNodeContainer.getCachedMNodeContainer(parent);
         bufferIterator = container.getChildrenBufferIterator();
-        if (!container.isVolatile()) {
-          this.iterator = file.getChildren(parent);
-          isIteratingDisk = true;
-        } else {
-          iterator = bufferIterator;
-          isIteratingDisk = false;
-        }
-
+        diskIterator =
+            !container.isVolatile() ? file.getChildren(parent) : 
Collections.emptyIterator();
+        mergeIterator = new CachedMNodeMergeIterator(diskIterator, 
bufferIterator);
       } catch (Throwable e) {
         lockManager.stampedReadUnlock(parent, readLockStamp);
         if (needLock) {
@@ -690,77 +684,12 @@ public class CachedMTreeStore implements 
IMTreeStore<ICachedMNode> {
 
     @Override
     public boolean hasNext() {
-      if (nextNode != null) {
-        return true;
-      } else {
-        try {
-          readNext();
-        } catch (MetadataException e) {
-          LOGGER.error("Error occurred during readNext, {}", e.getMessage());
-          return false;
-        }
-        return nextNode != null;
-      }
+      return mergeIterator.hasNext();
     }
 
-    // must invoke hasNext() first
     @Override
     public ICachedMNode next() {
-      if (nextNode == null && !hasNext()) {
-        throw new NoSuchElementException();
-      }
-      ICachedMNode result = nextNode;
-      nextNode = null;
-      return result;
-    }
-
-    private void readNext() throws MetadataException {
-      ICachedMNode node = null;
-      if (isIteratingDisk) {
-        ICachedMNodeContainer container = 
ICachedMNodeContainer.getCachedMNodeContainer(parent);
-        if (iterator.hasNext()) {
-          node = iterator.next();
-          while (container.hasChildInBuffer(node.getName())) {
-            if (iterator.hasNext()) {
-              node = iterator.next();
-            } else {
-              node = null;
-              break;
-            }
-          }
-        }
-        if (node != null) {
-          ICachedMNode nodeInMem = parent.getChild(node.getName());
-          if (nodeInMem != null) {
-            // this branch means the node load from disk is in cache, thus use 
the instance in
-            // cache
-            try {
-              memoryManager.updateCacheStatusAfterMemoryRead(nodeInMem);
-              node = nodeInMem;
-            } catch (MNodeNotCachedException e) {
-              node = loadChildFromDiskToParent(parent, node);
-            }
-          } else {
-            node = loadChildFromDiskToParent(parent, node);
-          }
-          nextNode = node;
-          return;
-        } else {
-          startIteratingBuffer();
-        }
-      }
-
-      if (iterator.hasNext()) {
-        node = iterator.next();
-        // node in buffer won't be evicted during Iteration
-        memoryManager.updateCacheStatusAfterMemoryRead(node);
-      }
-      nextNode = node;
-    }
-
-    private void startIteratingBuffer() {
-      iterator = bufferIterator;
-      isIteratingDisk = false;
+      return mergeIterator.next();
     }
 
     @Override
@@ -770,19 +699,51 @@ public class CachedMTreeStore implements 
IMTreeStore<ICachedMNode> {
 
     @Override
     public void close() {
-      try {
-        if (nextNode != null) {
-          unPin(nextNode, false);
-          nextNode = null;
+      if (isLocked) {
+        lockManager.stampedReadUnlock(parent, readLockStamp);
+        if (needLock) {
+          lockManager.globalReadUnlock();
         }
-      } finally {
-        if (isLocked) {
-          lockManager.stampedReadUnlock(parent, readLockStamp);
-          if (needLock) {
-            lockManager.globalReadUnlock();
+        isLocked = false;
+      }
+    }
+
+    private class CachedMNodeMergeIterator extends 
MergeSortIterator<ICachedMNode> {
+      public CachedMNodeMergeIterator(
+          Iterator<ICachedMNode> diskIterator, Iterator<ICachedMNode> 
bufferIterator) {
+        super(diskIterator, bufferIterator);
+      }
+
+      protected ICachedMNode onReturnLeft(ICachedMNode ansMNode) {
+        ICachedMNode nodeInMem = parent.getChild(ansMNode.getName());
+        if (nodeInMem != null) {
+          try {
+            memoryManager.updateCacheStatusAfterMemoryRead(nodeInMem);
+            ansMNode = nodeInMem;
+          } catch (MNodeNotCachedException e) {
+            ansMNode = loadChildFromDiskToParent(parent, ansMNode);
           }
-          isLocked = false;
+        } else {
+          ansMNode = loadChildFromDiskToParent(parent, ansMNode);
+        }
+        return ansMNode;
+      }
+
+      protected ICachedMNode onReturnRight(ICachedMNode ansMNode) {
+        try {
+          memoryManager.updateCacheStatusAfterMemoryRead(ansMNode);
+        } catch (MNodeNotCachedException e) {
+          throw new RuntimeException(e);
         }
+        return ansMNode;
+      }
+
+      protected int decide() {
+        return 1;
+      }
+
+      protected int compare(ICachedMNode left, ICachedMNode right) {
+        return left.getName().compareTo(right.getName());
       }
     }
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/mnode/container/CachedMNodeContainer.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/mnode/container/CachedMNodeContainer.java
index 20987ac41e5..0e5f272f71b 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/mnode/container/CachedMNodeContainer.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/mnode/container/CachedMNodeContainer.java
@@ -19,6 +19,7 @@
 
 package 
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.mnode.container;
 
+import org.apache.iotdb.commons.schema.MergeSortIterator;
 import org.apache.iotdb.commons.schema.node.utils.IMNodeContainer;
 import 
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.mnode.ICachedMNode;
 
@@ -254,7 +255,7 @@ public class CachedMNodeContainer implements 
ICachedMNodeContainer {
 
   @Override
   public Iterator<ICachedMNode> getChildrenBufferIterator() {
-    return new CachedMNodeContainerIterator((byte) 1);
+    return new BufferIterator();
   }
 
   @Override
@@ -408,11 +409,11 @@ public class CachedMNodeContainer implements 
ICachedMNodeContainer {
     private boolean changeStatus() {
       switch (status) {
         case 0:
-          iterator = getChildCache().values().iterator();
+          iterator = getNewChildBuffer().getMNodeChildBufferIterator();
           status = 1;
           return true;
         case 1:
-          iterator = getNewChildBuffer().getMNodeChildBufferIterator();
+          iterator = getUpdatedChildBuffer().getMNodeChildBufferIterator();
           status = 2;
           return true;
         case 2:
@@ -425,6 +426,24 @@ public class CachedMNodeContainer implements 
ICachedMNodeContainer {
     }
   }
 
+  private class BufferIterator extends MergeSortIterator<ICachedMNode> {
+
+    BufferIterator() {
+      super(
+          getNewChildBuffer().getMNodeChildBufferIterator(),
+          getUpdatedChildBuffer().getMNodeChildBufferIterator());
+    }
+
+    protected int decide() {
+      throw new IllegalStateException(
+          "There shall not exist two node with the same name separately in 
newChildBuffer and updateChildBuffer");
+    }
+
+    protected int compare(ICachedMNode left, ICachedMNode right) {
+      return left.getName().compareTo(right.getName());
+    }
+  }
+
   private static class EmptyContainer extends AbstractMap<String, ICachedMNode>
       implements IMNodeContainer<ICachedMNode> {
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/mnode/container/MNodeChildBuffer.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/mnode/container/MNodeChildBuffer.java
index 838310f558e..cd79d554f69 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/mnode/container/MNodeChildBuffer.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/mnode/container/MNodeChildBuffer.java
@@ -19,6 +19,7 @@
 
 package 
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.mnode.container;
 
+import org.apache.iotdb.commons.schema.MergeSortIterator;
 import 
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.mnode.ICachedMNode;
 
 import javax.annotation.Nonnull;
@@ -33,7 +34,6 @@ import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
-import java.util.NoSuchElementException;
 import java.util.Set;
 import java.util.TreeSet;
 import java.util.concurrent.ConcurrentHashMap;
@@ -198,62 +198,29 @@ public abstract class MNodeChildBuffer implements 
IMNodeChildBuffer {
     throw new UnsupportedOperationException();
   }
 
-  private class MNodeChildBufferIterator implements Iterator<ICachedMNode> {
+  private Iterator<ICachedMNode> getSortedReceivingBuffer() {
+    List<ICachedMNode> receivingBufferList = new 
ArrayList<>(getReceivingBuffer().values());
+    receivingBufferList.sort(Comparator.comparing(ICachedMNode::getName));
+    return receivingBufferList.iterator();
+  }
 
-    int flushingIndex = 0;
-    int receivingIndex = 0;
-    List<ICachedMNode> flushingBufferList;
-    List<ICachedMNode> receivingBufferList;
+  private Iterator<ICachedMNode> getSortedFlushingBuffer() {
+    List<ICachedMNode> flushingBufferList = new 
ArrayList<>(getFlushingBuffer().values());
+    flushingBufferList.sort(Comparator.comparing(ICachedMNode::getName));
+    return flushingBufferList.iterator();
+  }
 
+  private class MNodeChildBufferIterator extends 
MergeSortIterator<ICachedMNode> {
     MNodeChildBufferIterator() {
-      // use merge sort to merge them and remove duplicates.
-      List<ICachedMNode> list = new ArrayList<>();
-      receivingBufferList = new ArrayList<>(getReceivingBuffer().values());
-      flushingBufferList = new ArrayList<>(getFlushingBuffer().values());
-      receivingBufferList.sort(Comparator.comparing(ICachedMNode::getName));
-      flushingBufferList.sort(Comparator.comparing(ICachedMNode::getName));
-    }
-
-    private ICachedMNode tryGetNext() {
-      // There are only three situations here, namely, both are left, and each 
of the two is left,
-      // and then gradually merge and remove duplicates.
-      if (receivingIndex < receivingBufferList.size()
-          && flushingIndex < flushingBufferList.size()) {
-        ICachedMNode node1 = receivingBufferList.get(receivingIndex);
-        ICachedMNode node2 = flushingBufferList.get(flushingIndex);
-        if (node1.getName().compareTo(node2.getName()) < 0) {
-          receivingIndex++;
-          return node1;
-        } else if (node1.getName().compareTo(node2.getName()) > 0) {
-          flushingIndex++;
-          return node2;
-        } else {
-          receivingIndex++;
-          flushingIndex++;
-          return node1;
-        }
-      }
-      if (receivingIndex < receivingBufferList.size()) {
-        return receivingBufferList.get(receivingIndex++);
-      }
-      if (flushingIndex < flushingBufferList.size()) {
-        return flushingBufferList.get(flushingIndex++);
-      }
-      throw new NoSuchElementException();
+      super(getSortedReceivingBuffer(), getSortedFlushingBuffer());
     }
 
-    @Override
-    public boolean hasNext() {
-      return flushingIndex < flushingBufferList.size()
-          || receivingIndex < receivingBufferList.size();
+    protected int decide() {
+      return -1;
     }
 
-    @Override
-    public ICachedMNode next() {
-      if (!hasNext()) {
-        throw new NoSuchElementException();
-      }
-      return tryGetNext();
+    protected int compare(ICachedMNode left, ICachedMNode right) {
+      return left.getName().compareTo(right.getName());
     }
   }
 
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/MergeSortIterator.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/MergeSortIterator.java
new file mode 100644
index 00000000000..a393e89e58a
--- /dev/null
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/MergeSortIterator.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.commons.schema;
+
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+
+public abstract class MergeSortIterator<E> implements Iterator<E> {
+  private Iterator<E> leftIterator; // iterator of the sequence on the left
+  private Iterator<E> rightIterator; // iterator of the sequence on the right
+
+  private E leftHeader; // The first element of the sequence on the left
+  private E rightHeader; // The first element of the sequence on the right
+
+  protected MergeSortIterator(Iterator<E> leftIterator, Iterator<E> 
rightIterator) {
+    this.leftIterator = leftIterator;
+    this.rightIterator = rightIterator;
+    leftHeader = leftIterator.hasNext() ? leftIterator.next() : null;
+    rightHeader = rightIterator.hasNext() ? rightIterator.next() : null;
+  }
+
+  // Determine whether there is a next element
+  public boolean hasNext() {
+    return leftHeader != null || rightHeader != null;
+  }
+
+  // Get the next element. If there is no next element, an error will be 
reported.
+  public E next() {
+    if (!hasNext()) {
+      throw new NoSuchElementException();
+    }
+    return tryGetNext();
+  }
+
+  // In merge sort, the first element of the left sequence is added to the 
sorted sequence while
+  // update the leftHeader.
+  private E catchLeft() {
+    E ans = leftHeader;
+    leftHeader = leftIterator.hasNext() ? leftIterator.next() : null;
+    return ans;
+  }
+
+  // In merge sort, the first element of the right sequence is added to the 
sorted sequence while
+  // update the rightHeader.
+  private E catchRight() {
+    E ans = rightHeader;
+    rightHeader = rightIterator.hasNext() ? rightIterator.next() : null;
+    return ans;
+  }
+
+  // When two elements are the same, according to the choice of decide, the 
target element is left
+  // and the other element is deleted.
+  private E catchEqual(int decide) {
+    switch (decide) {
+      case -1:
+        rightHeader = rightIterator.hasNext() ? rightIterator.next() : null;
+        return onReturnLeft(catchLeft());
+      case 1:
+        leftHeader = leftIterator.hasNext() ? leftIterator.next() : null;
+        return onReturnRight(catchRight());
+      default:
+        throw new IllegalArgumentException();
+    }
+  }
+
+  // One step in merge sort: compare the first elements of the two sequences 
and process them based
+  // on the comparison results
+  E tryGetNext() {
+    if (leftHeader != null && rightHeader != null) {
+      if (compare(leftHeader, rightHeader) == 0) {
+        return catchEqual(decide());
+      } else if (compare(leftHeader, rightHeader) < 0) {
+        return onReturnLeft(catchLeft());
+      } else if (compare(leftHeader, rightHeader) > 0) {
+        return onReturnRight(catchRight());
+      } else {
+        throw new IllegalArgumentException();
+      }
+    } else if (leftHeader != null) {
+      return onReturnLeft(catchLeft());
+    } else if (rightHeader != null) {
+      return onReturnRight(catchRight());
+    } else {
+      throw new NoSuchElementException();
+    }
+  }
+
+  // Post-process the first element of the left sequence
+  protected E onReturnLeft(E left) {
+    return left;
+  }
+
+  // Post-process the first element of the right sequence
+  protected E onReturnRight(E right) {
+    return right;
+  }
+
+  // Decide the target element when two elements are the same
+  protected int decide() {
+    return 0;
+  }
+
+  // Compare two elements
+  protected abstract int compare(E left, E right);
+}

Reply via email to