This is an automated email from the ASF dual-hosted git repository.

shuwenwei pushed a commit to branch transferSchemaTreeInBatches
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/transferSchemaTreeInBatches by 
this push:
     new 212981062dd add memory control for schema tree
212981062dd is described below

commit 212981062dd49dd1939a4d02e54f2caa7a2bcfb2
Author: shuwenwei <[email protected]>
AuthorDate: Fri Jul 25 12:25:56 2025 +0800

    add memory control for schema tree
---
 .../db/queryengine/common/MPPQueryContext.java     | 33 +++++++++++++++++
 .../common/schematree/ClusterSchemaTree.java       | 41 ++++++++++++++++++----
 .../queryengine/common/schematree/ISchemaTree.java |  3 +-
 .../common/schematree/node/SchemaEntityNode.java   | 20 +++++++++++
 .../common/schematree/node/SchemaInternalNode.java | 16 +++++++++
 .../schematree/node/SchemaMeasurementNode.java     | 14 ++++++++
 .../common/schematree/node/SchemaNode.java         |  4 ++-
 .../execution/MemoryEstimationHelper.java          | 24 +++++++++++++
 .../operator/schema/SchemaFetchScanOperator.java   | 31 +++++++++++++---
 .../queryengine/plan/analyze/AnalyzeVisitor.java   |  2 ++
 .../db/queryengine/plan/analyze/Analyzer.java      | 14 ++++++--
 .../analyze/schema/ClusterSchemaFetchExecutor.java | 18 ++++++----
 .../iotdb/db/schemaengine/template/Template.java   | 16 ++++++++-
 13 files changed, 214 insertions(+), 22 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/MPPQueryContext.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/MPPQueryContext.java
index b4bd87b2e7e..080a8f1b0de 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/MPPQueryContext.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/MPPQueryContext.java
@@ -38,6 +38,7 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.Optional;
 import java.util.Set;
+import java.util.function.LongConsumer;
 
 /**
  * This class is used to record the context of a query including QueryId, 
query statement, session
@@ -89,6 +90,10 @@ public class MPPQueryContext {
   // the updateScanNum process in distributed planning can be skipped.
   private boolean needUpdateScanNumForLastQuery = false;
 
+  private long reservedMemoryCostForSchemaTree = 0;
+  private boolean releaseSchemaTreeAfterAnalyzing = true;
+  private LongConsumer reserveMemoryForSchemaTreeFunc = null;
+
   private boolean userQuery = false;
 
   public MPPQueryContext(QueryId queryId) {
@@ -129,6 +134,34 @@ public class MPPQueryContext {
     this.initResultNodeContext();
   }
 
+  public void setReserveMemoryForSchemaTreeFunc(LongConsumer 
reserveMemoryForSchemaTreeFunc) {
+    this.reserveMemoryForSchemaTreeFunc = reserveMemoryForSchemaTreeFunc;
+  }
+
+  public void reserveMemoryForSchemaTree(long memoryCost) {
+    if (reserveMemoryForSchemaTreeFunc == null) {
+      return;
+    }
+    reserveMemoryForSchemaTreeFunc.accept(memoryCost);
+    this.reservedMemoryCostForSchemaTree += memoryCost;
+  }
+
+  public void setReleaseSchemaTreeAfterAnalyzing(boolean 
releaseSchemaTreeAfterAnalyzing) {
+    this.releaseSchemaTreeAfterAnalyzing = releaseSchemaTreeAfterAnalyzing;
+  }
+
+  public boolean releaseSchemaTreeAfterAnalyzing() {
+    return releaseSchemaTreeAfterAnalyzing;
+  }
+
+  public void releaseMemoryForSchemaTree() {
+    if (reservedMemoryCostForSchemaTree <= 0) {
+      return;
+    }
+    
this.memoryReservationManager.releaseMemoryCumulatively(reservedMemoryCostForSchemaTree);
+    reservedMemoryCostForSchemaTree = 0;
+  }
+
   public void prepareForRetry() {
     this.initResultNodeContext();
     this.releaseAllMemoryReservedForFrontEnd();
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/schematree/ClusterSchemaTree.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/schematree/ClusterSchemaTree.java
index 6e9806a4ee4..6d6182a4e68 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/schematree/ClusterSchemaTree.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/schematree/ClusterSchemaTree.java
@@ -40,6 +40,7 @@ import org.apache.iotdb.db.schemaengine.template.Template;
 
 import org.apache.tsfile.file.metadata.IDeviceID;
 import org.apache.tsfile.utils.Pair;
+import org.apache.tsfile.utils.RamUsageEstimator;
 import org.apache.tsfile.utils.ReadWriteIOUtils;
 import org.apache.tsfile.write.schema.IMeasurementSchema;
 
@@ -63,6 +64,8 @@ import static 
org.apache.iotdb.db.queryengine.common.schematree.node.SchemaNode.
 import static 
org.apache.iotdb.db.queryengine.common.schematree.node.SchemaNode.SCHEMA_MEASUREMENT_NODE;
 
 public class ClusterSchemaTree implements ISchemaTree {
+  private static final long SHALLOW_SIZE =
+      RamUsageEstimator.shallowSizeOfInstance(ClusterSchemaTree.class);
   private static final ClusterTemplateManager templateManager =
       ClusterTemplateManager.getInstance();
 
@@ -78,6 +81,8 @@ public class ClusterSchemaTree implements ISchemaTree {
 
   private Map<Integer, Template> templateMap = new HashMap<>();
 
+  private long memCost;
+
   public ClusterSchemaTree() {
     root = new SchemaInternalNode(PATH_ROOT);
   }
@@ -491,6 +496,21 @@ public class ClusterSchemaTree implements ISchemaTree {
     return new SchemaNodePostOrderIterator(root);
   }
 
+  @Override
+  public long ramBytesUsed() {
+    if (memCost > 0) {
+      return memCost;
+    }
+    long startTime = System.currentTimeMillis();
+    memCost = root.ramBytesUsed() + SHALLOW_SIZE + 
RamUsageEstimator.sizeOfMap(templateMap);
+    System.out.println(System.currentTimeMillis() - startTime);
+    return memCost;
+  }
+
+  public void setMemCost(long memCost) {
+    this.memCost = memCost;
+  }
+
   private static class SchemaNodePostOrderIterator implements 
Iterator<SchemaNode> {
     private final Deque<Pair<SchemaNode, Iterator<SchemaNode>>> stack = new 
ArrayDeque<>();
     private SchemaNode nextNode;
@@ -534,15 +554,21 @@ public class ClusterSchemaTree implements ISchemaTree {
   }
 
   public static class SchemaNodeBatchDeserializer {
-    byte nodeType;
-    int childNum;
-    Deque<SchemaNode> stack = new ArrayDeque<>();
-    SchemaNode child;
-    boolean hasLogicalView = false;
-    boolean hasNormalTimeSeries = false;
-    Map<Integer, Template> templateMap = new HashMap<>();
+    private byte nodeType;
+    private int childNum;
+    private Deque<SchemaNode> stack = new ArrayDeque<>();
+    private SchemaNode child;
+    private boolean hasLogicalView = false;
+    private boolean hasNormalTimeSeries = false;
+    private Map<Integer, Template> templateMap = new HashMap<>();
+    private boolean isFirstBatch = true;
+
+    public boolean isFirstBatch() {
+      return isFirstBatch;
+    }
 
     public void deserializeFromBatch(InputStream inputStream) throws 
IOException {
+      isFirstBatch = false;
       while (inputStream.available() > 0) {
         nodeType = ReadWriteIOUtils.readByte(inputStream);
         if (nodeType == SCHEMA_MEASUREMENT_NODE) {
@@ -598,6 +624,7 @@ public class ClusterSchemaTree implements ISchemaTree {
         hasLogicalView = false;
         hasNormalTimeSeries = false;
         templateMap = new HashMap<>();
+        isFirstBatch = true;
       }
     }
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/schematree/ISchemaTree.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/schematree/ISchemaTree.java
index 823f7f94c00..543991a6ff7 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/schematree/ISchemaTree.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/schematree/ISchemaTree.java
@@ -25,12 +25,13 @@ import org.apache.iotdb.commons.utils.TestOnly;
 import org.apache.iotdb.db.schemaengine.template.Template;
 
 import org.apache.tsfile.file.metadata.IDeviceID;
+import org.apache.tsfile.utils.Accountable;
 import org.apache.tsfile.utils.Pair;
 
 import java.util.List;
 import java.util.Set;
 
-public interface ISchemaTree {
+public interface ISchemaTree extends Accountable {
   /**
    * Return all measurement paths for given path pattern and filter the result 
by slimit and offset.
    *
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/schematree/node/SchemaEntityNode.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/schematree/node/SchemaEntityNode.java
index c160a81f20f..2cb8ede8023 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/schematree/node/SchemaEntityNode.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/schematree/node/SchemaEntityNode.java
@@ -19,6 +19,9 @@
 
 package org.apache.iotdb.db.queryengine.common.schematree.node;
 
+import org.apache.iotdb.db.queryengine.execution.MemoryEstimationHelper;
+
+import org.apache.tsfile.utils.RamUsageEstimator;
 import org.apache.tsfile.utils.ReadWriteIOUtils;
 
 import java.io.IOException;
@@ -31,6 +34,9 @@ import static 
org.apache.iotdb.commons.schema.SchemaConstant.NON_TEMPLATE;
 
 public class SchemaEntityNode extends SchemaInternalNode {
 
+  private static final long SHALLOW_SIZE =
+      RamUsageEstimator.shallowSizeOfInstance(SchemaEntityNode.class);
+
   private boolean isAligned;
 
   private Map<String, SchemaMeasurementNode> aliasChildren;
@@ -138,4 +144,18 @@ public class SchemaEntityNode extends SchemaInternalNode {
     entityNode.setTemplateId(templateId);
     return entityNode;
   }
+
+  @Override
+  public long ramBytesUsed() {
+    return SHALLOW_SIZE
+        + RamUsageEstimator.sizeOf(name)
+        + MemoryEstimationHelper.getEstimatedSizeOfMap(
+            children,
+            MemoryEstimationHelper.SHALLOW_SIZE_OF_HASHMAP,
+            MemoryEstimationHelper.SHALLOW_SIZE_OF_HASHMAP_ENTRY)
+        + MemoryEstimationHelper.getEstimatedSizeOfMap(
+            aliasChildren,
+            MemoryEstimationHelper.SHALLOW_SIZE_OF_HASHMAP,
+            MemoryEstimationHelper.SHALLOW_SIZE_OF_HASHMAP_ENTRY);
+  }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/schematree/node/SchemaInternalNode.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/schematree/node/SchemaInternalNode.java
index 155396a0186..2700a454d13 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/schematree/node/SchemaInternalNode.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/schematree/node/SchemaInternalNode.java
@@ -19,6 +19,9 @@
 
 package org.apache.iotdb.db.queryengine.common.schematree.node;
 
+import org.apache.iotdb.db.queryengine.execution.MemoryEstimationHelper;
+
+import org.apache.tsfile.utils.RamUsageEstimator;
 import org.apache.tsfile.utils.ReadWriteIOUtils;
 
 import java.io.IOException;
@@ -30,6 +33,9 @@ import java.util.Map;
 
 public class SchemaInternalNode extends SchemaNode {
 
+  private static final long SHALLOW_SIZE =
+      RamUsageEstimator.shallowSizeOfInstance(SchemaInternalNode.class);
+
   protected Map<String, SchemaNode> children = new HashMap<>();
 
   public SchemaInternalNode(String name) {
@@ -105,4 +111,14 @@ public class SchemaInternalNode extends SchemaNode {
 
     return new SchemaInternalNode(name);
   }
+
+  @Override
+  public long ramBytesUsed() {
+    return SHALLOW_SIZE
+        + RamUsageEstimator.sizeOf(name)
+        + MemoryEstimationHelper.getEstimatedSizeOfMap(
+            children,
+            MemoryEstimationHelper.SHALLOW_SIZE_OF_HASHMAP,
+            MemoryEstimationHelper.SHALLOW_SIZE_OF_HASHMAP_ENTRY);
+  }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/schematree/node/SchemaMeasurementNode.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/schematree/node/SchemaMeasurementNode.java
index a896a57fdfd..583619539ae 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/schematree/node/SchemaMeasurementNode.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/schematree/node/SchemaMeasurementNode.java
@@ -22,6 +22,7 @@ package 
org.apache.iotdb.db.queryengine.common.schematree.node;
 import org.apache.iotdb.commons.schema.view.LogicalViewSchema;
 import 
org.apache.iotdb.db.queryengine.common.schematree.IMeasurementSchemaInfo;
 
+import org.apache.tsfile.utils.RamUsageEstimator;
 import org.apache.tsfile.utils.ReadWriteIOUtils;
 import org.apache.tsfile.write.schema.IMeasurementSchema;
 import org.apache.tsfile.write.schema.MeasurementSchema;
@@ -34,6 +35,9 @@ import java.util.Map;
 
 public class SchemaMeasurementNode extends SchemaNode implements 
IMeasurementSchemaInfo {
 
+  private static final long SHALLOW_SIZE =
+      RamUsageEstimator.shallowSizeOfInstance(SchemaMeasurementNode.class);
+
   private String alias;
   private IMeasurementSchema schema;
   private Map<String, String> tagMap;
@@ -44,6 +48,16 @@ public class SchemaMeasurementNode extends SchemaNode 
implements IMeasurementSch
     this.schema = schema;
   }
 
+  @Override
+  public long ramBytesUsed() {
+    return SHALLOW_SIZE
+        + RamUsageEstimator.sizeOf(name)
+        + RamUsageEstimator.sizeOf(alias)
+        + schema.ramBytesUsed()
+        + RamUsageEstimator.sizeOfMap(tagMap)
+        + RamUsageEstimator.sizeOfMap(attributeMap);
+  }
+
   public String getAlias() {
     return alias;
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/schematree/node/SchemaNode.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/schematree/node/SchemaNode.java
index 0ca11bd0d7e..dd4e5e57a2e 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/schematree/node/SchemaNode.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/schematree/node/SchemaNode.java
@@ -21,13 +21,15 @@ package 
org.apache.iotdb.db.queryengine.common.schematree.node;
 
 import org.apache.iotdb.commons.schema.tree.ITreeNode;
 
+import org.apache.tsfile.utils.Accountable;
+
 import java.io.IOException;
 import java.io.OutputStream;
 import java.util.Collections;
 import java.util.Iterator;
 import java.util.Map;
 
-public abstract class SchemaNode implements ITreeNode {
+public abstract class SchemaNode implements ITreeNode, Accountable {
 
   public static final byte SCHEMA_INTERNAL_NODE = 0;
   public static final byte SCHEMA_ENTITY_NODE = 1;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/MemoryEstimationHelper.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/MemoryEstimationHelper.java
index f7318865c2e..39a98c80a12 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/MemoryEstimationHelper.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/MemoryEstimationHelper.java
@@ -30,7 +30,10 @@ import javax.annotation.Nullable;
 
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
 
 public class MemoryEstimationHelper {
 
@@ -48,6 +51,13 @@ public class MemoryEstimationHelper {
   private static final long INTEGER_INSTANCE_SIZE =
       RamUsageEstimator.shallowSizeOfInstance(Integer.class);
 
+  public static final long SHALLOW_SIZE_OF_HASHMAP =
+      RamUsageEstimator.shallowSizeOfInstance(HashMap.class);
+  public static final long SHALLOW_SIZE_OF_HASHMAP_ENTRY = 32;
+  public static final long SHALLOW_SIZE_OF_CONCURRENT_HASHMAP =
+      RamUsageEstimator.shallowSizeOfInstance(ConcurrentHashMap.class);
+  public static final long SHALLOW_SIZE_OF_CONCURRENT_HASHMAP_ENTRY = 24;
+
   private MemoryEstimationHelper() {
     // hide the constructor
   }
@@ -119,4 +129,18 @@ public class MemoryEstimationHelper {
     size += INTEGER_INSTANCE_SIZE * integerArrayList.size();
     return RamUsageEstimator.alignObjectSize(size);
   }
+
+  public static long getEstimatedSizeOfMap(
+      Map<?, ? extends Accountable> map, long shallowSizeOfMap, long 
shallowSizeOfMapEntry) {
+    if (map == null) {
+      return 0;
+    }
+    long result = shallowSizeOfMap;
+    for (Map.Entry<?, ? extends Accountable> entry : map.entrySet()) {
+      result += RamUsageEstimator.sizeOfObject(entry.getKey());
+      result += entry.getValue() == null ? 0 : entry.getValue().ramBytesUsed();
+    }
+    result += map.size() * shallowSizeOfMapEntry;
+    return RamUsageEstimator.alignObjectSize(result);
+  }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/schema/SchemaFetchScanOperator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/schema/SchemaFetchScanOperator.java
index d504490c055..f7207a97c48 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/schema/SchemaFetchScanOperator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/schema/SchemaFetchScanOperator.java
@@ -28,6 +28,7 @@ import 
org.apache.iotdb.db.queryengine.common.schematree.node.SchemaNode;
 import org.apache.iotdb.db.queryengine.execution.MemoryEstimationHelper;
 import org.apache.iotdb.db.queryengine.execution.operator.OperatorContext;
 import 
org.apache.iotdb.db.queryengine.execution.operator.source.SourceOperator;
+import 
org.apache.iotdb.db.queryengine.plan.planner.memory.MemoryReservationManager;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId;
 import org.apache.iotdb.db.schemaengine.schemaregion.ISchemaRegion;
 import org.apache.iotdb.db.schemaengine.template.Template;
@@ -64,6 +65,7 @@ public class SchemaFetchScanOperator implements 
SourceOperator {
   private final PathPatternTree authorityScope;
 
   private Iterator<SchemaNode> schemaNodeIteratorForSerialize = null;
+  private long schemaTreeMemCost;
   // Reserve some bytes to avoid capacity expansion
   private PublicBAOS baos = new PublicBAOS(DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES + 
1024);
 
@@ -158,9 +160,14 @@ public class SchemaFetchScanOperator implements 
SourceOperator {
       throw new NoSuchElementException();
     }
 
-    prepareSchemaNodeIterator();
+    boolean isFirstBatch = schemaNodeIteratorForSerialize == null;
+    prepareSchemaNodeIteratorForSerialize();
     // to indicate this binary data is database info
     ReadWriteIOUtils.write((byte) 1, baos);
+    // the estimated mem cost to deserialize the total schema tree
+    if (isFirstBatch) {
+      ReadWriteIOUtils.write(schemaTreeMemCost, baos);
+    }
     while (schemaNodeIteratorForSerialize.hasNext()
         && baos.size() < DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES) {
       SchemaNode node = schemaNodeIteratorForSerialize.next();
@@ -172,7 +179,7 @@ public class SchemaFetchScanOperator implements 
SourceOperator {
     if (isFinished) {
       // indicate all continuous binary data is finished
       currentBatch[0] = 2;
-      schemaNodeIteratorForSerialize = null;
+      releaseSchemaTree();
       baos = null;
     }
     return new TsBlock(
@@ -192,7 +199,7 @@ public class SchemaFetchScanOperator implements 
SourceOperator {
 
   @Override
   public void close() throws Exception {
-    schemaNodeIteratorForSerialize = null;
+    releaseSchemaTree();
     baos = null;
   }
 
@@ -201,7 +208,7 @@ public class SchemaFetchScanOperator implements 
SourceOperator {
     return sourceId;
   }
 
-  private void prepareSchemaNodeIterator() {
+  private void prepareSchemaNodeIteratorForSerialize() {
     if (schemaNodeIteratorForSerialize != null) {
       return;
     }
@@ -212,6 +219,10 @@ public class SchemaFetchScanOperator implements 
SourceOperator {
               : schemaRegion.fetchSeriesSchema(
                   patternTree, templateMap, withTags, withAttributes, 
withTemplate, withAliasForce);
       schemaNodeIteratorForSerialize = schemaTree.getIteratorForSerialize();
+      schemaTreeMemCost = schemaTree.ramBytesUsed();
+      MemoryReservationManager memoryReservationContext =
+          operatorContext.getInstanceContext().getMemoryReservationContext();
+      memoryReservationContext.reserveMemoryCumulatively(schemaTreeMemCost);
     } catch (MetadataException e) {
       throw new SchemaExecutionException(e);
     }
@@ -238,4 +249,16 @@ public class SchemaFetchScanOperator implements 
SourceOperator {
         + 
MemoryEstimationHelper.getEstimatedSizeOfAccountableObject(operatorContext)
         + MemoryEstimationHelper.getEstimatedSizeOfAccountableObject(sourceId);
   }
+
+  private void releaseSchemaTree() {
+    if (schemaTreeMemCost <= 0) {
+      return;
+    }
+    operatorContext
+        .getInstanceContext()
+        .getMemoryReservationContext()
+        .releaseMemoryCumulatively(schemaTreeMemCost);
+    schemaTreeMemCost = 0;
+    schemaNodeIteratorForSerialize = null;
+  }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java
index 9f43666a12b..8c1b8ec61e9 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java
@@ -523,6 +523,7 @@ public class AnalyzeVisitor extends 
StatementVisitor<Analysis, MPPQueryContext>
       context.setFetchSchemaCost(schemaFetchCost);
       QueryPlanCostMetricSet.getInstance().recordTreePlanCost(SCHEMA_FETCHER, 
schemaFetchCost);
     }
+    analysis.setSchemaTree(schemaTree);
     return schemaTree;
   }
 
@@ -3552,6 +3553,7 @@ public class AnalyzeVisitor extends 
StatementVisitor<Analysis, MPPQueryContext>
       }
     }
     analysis.setSchemaTree(schemaTree);
+    context.setReleaseSchemaTreeAfterAnalyzing(false);
 
     Map<String, List<DataPartitionQueryParam>> sgNameToQueryParamsMap = new 
HashMap<>();
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/Analyzer.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/Analyzer.java
index 43cf67a283c..8641e4631c2 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/Analyzer.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/Analyzer.java
@@ -43,8 +43,18 @@ public class Analyzer {
 
   public Analysis analyze(Statement statement) {
     long startTime = System.nanoTime();
-    Analysis analysis =
-        new AnalyzeVisitor(partitionFetcher, schemaFetcher).process(statement, 
context);
+    AnalyzeVisitor visitor = new AnalyzeVisitor(partitionFetcher, 
schemaFetcher);
+    Analysis analysis = null;
+    
context.setReserveMemoryForSchemaTreeFunc(context::reserveMemoryForFrontEnd);
+    try {
+      analysis = visitor.process(statement, context);
+    } finally {
+      if (analysis != null && context.releaseSchemaTreeAfterAnalyzing()) {
+        analysis.setSchemaTree(null);
+        context.releaseMemoryForSchemaTree();
+      }
+      context.setReserveMemoryForSchemaTreeFunc(null);
+    }
     if (context.getSession() != null) {
       // for test compatibility
       analysis.setDatabaseName(context.getDatabaseName().orElse(null));
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/schema/ClusterSchemaFetchExecutor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/schema/ClusterSchemaFetchExecutor.java
index 0d0e11e4019..c0eb25a649e 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/schema/ClusterSchemaFetchExecutor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/schema/ClusterSchemaFetchExecutor.java
@@ -276,7 +276,7 @@ class ClusterSchemaFetchExecutor {
           }
           Column column = tsBlock.get().getColumn(0);
           for (int i = 0; i < column.getPositionCount(); i++) {
-            parseFetchedData(column.getBinary(i), result, deserializer, 
databaseSet);
+            parseFetchedData(column.getBinary(i), result, deserializer, 
databaseSet, context);
           }
         }
         result.setDatabases(databaseSet);
@@ -294,7 +294,8 @@ class ClusterSchemaFetchExecutor {
       Binary data,
       ClusterSchemaTree resultSchemaTree,
       ClusterSchemaTree.SchemaNodeBatchDeserializer deserializer,
-      Set<String> databaseSet) {
+      Set<String> databaseSet,
+      MPPQueryContext context) {
     InputStream inputStream = new ByteArrayInputStream(data.getValues());
     try {
       byte type = ReadWriteIOUtils.readByte(inputStream);
@@ -303,11 +304,16 @@ class ClusterSchemaFetchExecutor {
         for (int i = 0; i < size; i++) {
           databaseSet.add(ReadWriteIOUtils.readString(inputStream));
         }
-      } else if (type == 1) {
-        deserializer.deserializeFromBatch(inputStream);
-      } else if (type == 2) {
+      } else if (type == 1 || type == 2) {
+        if (deserializer.isFirstBatch()) {
+          long memCost = ReadWriteIOUtils.readLong(inputStream);
+          context.reserveMemoryForSchemaTree(memCost);
+        }
         deserializer.deserializeFromBatch(inputStream);
-        resultSchemaTree.mergeSchemaTree(deserializer.finish());
+        if (type == 2) {
+          // 'type == 2' indicates this batch is finished
+          resultSchemaTree.mergeSchemaTree(deserializer.finish());
+        }
       } else {
         throw new RuntimeException(
             new MetadataException("Failed to fetch schema because of 
unrecognized data"));
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/template/Template.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/template/Template.java
index a23fcb78541..7d0a97d9b8f 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/template/Template.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/template/Template.java
@@ -20,11 +20,14 @@
 package org.apache.iotdb.db.schemaengine.template;
 
 import org.apache.iotdb.commons.exception.IllegalPathException;
+import org.apache.iotdb.db.queryengine.execution.MemoryEstimationHelper;
 
 import org.apache.commons.lang3.builder.HashCodeBuilder;
 import org.apache.tsfile.enums.TSDataType;
 import org.apache.tsfile.file.metadata.enums.CompressionType;
 import org.apache.tsfile.file.metadata.enums.TSEncoding;
+import org.apache.tsfile.utils.Accountable;
+import org.apache.tsfile.utils.RamUsageEstimator;
 import org.apache.tsfile.utils.ReadWriteIOUtils;
 import org.apache.tsfile.write.schema.IMeasurementSchema;
 import org.apache.tsfile.write.schema.MeasurementSchema;
@@ -39,8 +42,9 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 
-public class Template implements Serializable {
+public class Template implements Serializable, Accountable {
 
+  private static final long SHALLOW_SIZE = 
RamUsageEstimator.shallowSizeOfInstance(Template.class);
   private int id;
   private String name;
   private boolean isDirectAligned;
@@ -226,4 +230,14 @@ public class Template implements Serializable {
   public int hashCode() {
     return new HashCodeBuilder(17, 
37).append(name).append(schemaMap).toHashCode();
   }
+
+  @Override
+  public long ramBytesUsed() {
+    return SHALLOW_SIZE
+        + RamUsageEstimator.sizeOf(name)
+        + MemoryEstimationHelper.getEstimatedSizeOfMap(
+            schemaMap,
+            MemoryEstimationHelper.SHALLOW_SIZE_OF_CONCURRENT_HASHMAP,
+            MemoryEstimationHelper.SHALLOW_SIZE_OF_CONCURRENT_HASHMAP_ENTRY);
+  }
 }

Reply via email to