This is an automated email from the ASF dual-hosted git repository.
shuwenwei pushed a commit to branch transferSchemaTreeInBatches
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/transferSchemaTreeInBatches by
this push:
new 212981062dd add memory control for schema tree
212981062dd is described below
commit 212981062dd49dd1939a4d02e54f2caa7a2bcfb2
Author: shuwenwei <[email protected]>
AuthorDate: Fri Jul 25 12:25:56 2025 +0800
add memory control for schema tree
---
.../db/queryengine/common/MPPQueryContext.java | 33 +++++++++++++++++
.../common/schematree/ClusterSchemaTree.java | 41 ++++++++++++++++++----
.../queryengine/common/schematree/ISchemaTree.java | 3 +-
.../common/schematree/node/SchemaEntityNode.java | 20 +++++++++++
.../common/schematree/node/SchemaInternalNode.java | 16 +++++++++
.../schematree/node/SchemaMeasurementNode.java | 14 ++++++++
.../common/schematree/node/SchemaNode.java | 4 ++-
.../execution/MemoryEstimationHelper.java | 24 +++++++++++++
.../operator/schema/SchemaFetchScanOperator.java | 31 +++++++++++++---
.../queryengine/plan/analyze/AnalyzeVisitor.java | 2 ++
.../db/queryengine/plan/analyze/Analyzer.java | 14 ++++++--
.../analyze/schema/ClusterSchemaFetchExecutor.java | 18 ++++++----
.../iotdb/db/schemaengine/template/Template.java | 16 ++++++++-
13 files changed, 214 insertions(+), 22 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/MPPQueryContext.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/MPPQueryContext.java
index b4bd87b2e7e..080a8f1b0de 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/MPPQueryContext.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/MPPQueryContext.java
@@ -38,6 +38,7 @@ import java.util.LinkedList;
import java.util.List;
import java.util.Optional;
import java.util.Set;
+import java.util.function.LongConsumer;
/**
* This class is used to record the context of a query including QueryId,
query statement, session
@@ -89,6 +90,10 @@ public class MPPQueryContext {
// the updateScanNum process in distributed planning can be skipped.
private boolean needUpdateScanNumForLastQuery = false;
+ private long reservedMemoryCostForSchemaTree = 0;
+ private boolean releaseSchemaTreeAfterAnalyzing = true;
+ private LongConsumer reserveMemoryForSchemaTreeFunc = null;
+
private boolean userQuery = false;
public MPPQueryContext(QueryId queryId) {
@@ -129,6 +134,34 @@ public class MPPQueryContext {
this.initResultNodeContext();
}
+ public void setReserveMemoryForSchemaTreeFunc(LongConsumer
reserveMemoryForSchemaTreeFunc) {
+ this.reserveMemoryForSchemaTreeFunc = reserveMemoryForSchemaTreeFunc;
+ }
+
+ public void reserveMemoryForSchemaTree(long memoryCost) {
+ if (reserveMemoryForSchemaTreeFunc == null) {
+ return;
+ }
+ reserveMemoryForSchemaTreeFunc.accept(memoryCost);
+ this.reservedMemoryCostForSchemaTree += memoryCost;
+ }
+
+ public void setReleaseSchemaTreeAfterAnalyzing(boolean
releaseSchemaTreeAfterAnalyzing) {
+ this.releaseSchemaTreeAfterAnalyzing = releaseSchemaTreeAfterAnalyzing;
+ }
+
+ public boolean releaseSchemaTreeAfterAnalyzing() {
+ return releaseSchemaTreeAfterAnalyzing;
+ }
+
+ public void releaseMemoryForSchemaTree() {
+ if (reservedMemoryCostForSchemaTree <= 0) {
+ return;
+ }
+
this.memoryReservationManager.releaseMemoryCumulatively(reservedMemoryCostForSchemaTree);
+ reservedMemoryCostForSchemaTree = 0;
+ }
+
public void prepareForRetry() {
this.initResultNodeContext();
this.releaseAllMemoryReservedForFrontEnd();
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/schematree/ClusterSchemaTree.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/schematree/ClusterSchemaTree.java
index 6e9806a4ee4..6d6182a4e68 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/schematree/ClusterSchemaTree.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/schematree/ClusterSchemaTree.java
@@ -40,6 +40,7 @@ import org.apache.iotdb.db.schemaengine.template.Template;
import org.apache.tsfile.file.metadata.IDeviceID;
import org.apache.tsfile.utils.Pair;
+import org.apache.tsfile.utils.RamUsageEstimator;
import org.apache.tsfile.utils.ReadWriteIOUtils;
import org.apache.tsfile.write.schema.IMeasurementSchema;
@@ -63,6 +64,8 @@ import static
org.apache.iotdb.db.queryengine.common.schematree.node.SchemaNode.
import static
org.apache.iotdb.db.queryengine.common.schematree.node.SchemaNode.SCHEMA_MEASUREMENT_NODE;
public class ClusterSchemaTree implements ISchemaTree {
+ private static final long SHALLOW_SIZE =
+ RamUsageEstimator.shallowSizeOfInstance(ClusterSchemaTree.class);
private static final ClusterTemplateManager templateManager =
ClusterTemplateManager.getInstance();
@@ -78,6 +81,8 @@ public class ClusterSchemaTree implements ISchemaTree {
private Map<Integer, Template> templateMap = new HashMap<>();
+ private long memCost;
+
public ClusterSchemaTree() {
root = new SchemaInternalNode(PATH_ROOT);
}
@@ -491,6 +496,21 @@ public class ClusterSchemaTree implements ISchemaTree {
return new SchemaNodePostOrderIterator(root);
}
+ @Override
+ public long ramBytesUsed() {
+ if (memCost > 0) {
+ return memCost;
+ }
+ long startTime = System.currentTimeMillis();
+ memCost = root.ramBytesUsed() + SHALLOW_SIZE +
RamUsageEstimator.sizeOfMap(templateMap);
+ System.out.println(System.currentTimeMillis() - startTime);
+ return memCost;
+ }
+
+ public void setMemCost(long memCost) {
+ this.memCost = memCost;
+ }
+
private static class SchemaNodePostOrderIterator implements
Iterator<SchemaNode> {
private final Deque<Pair<SchemaNode, Iterator<SchemaNode>>> stack = new
ArrayDeque<>();
private SchemaNode nextNode;
@@ -534,15 +554,21 @@ public class ClusterSchemaTree implements ISchemaTree {
}
public static class SchemaNodeBatchDeserializer {
- byte nodeType;
- int childNum;
- Deque<SchemaNode> stack = new ArrayDeque<>();
- SchemaNode child;
- boolean hasLogicalView = false;
- boolean hasNormalTimeSeries = false;
- Map<Integer, Template> templateMap = new HashMap<>();
+ private byte nodeType;
+ private int childNum;
+ private Deque<SchemaNode> stack = new ArrayDeque<>();
+ private SchemaNode child;
+ private boolean hasLogicalView = false;
+ private boolean hasNormalTimeSeries = false;
+ private Map<Integer, Template> templateMap = new HashMap<>();
+ private boolean isFirstBatch = true;
+
+ public boolean isFirstBatch() {
+ return isFirstBatch;
+ }
public void deserializeFromBatch(InputStream inputStream) throws
IOException {
+ isFirstBatch = false;
while (inputStream.available() > 0) {
nodeType = ReadWriteIOUtils.readByte(inputStream);
if (nodeType == SCHEMA_MEASUREMENT_NODE) {
@@ -598,6 +624,7 @@ public class ClusterSchemaTree implements ISchemaTree {
hasLogicalView = false;
hasNormalTimeSeries = false;
templateMap = new HashMap<>();
+ isFirstBatch = true;
}
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/schematree/ISchemaTree.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/schematree/ISchemaTree.java
index 823f7f94c00..543991a6ff7 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/schematree/ISchemaTree.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/schematree/ISchemaTree.java
@@ -25,12 +25,13 @@ import org.apache.iotdb.commons.utils.TestOnly;
import org.apache.iotdb.db.schemaengine.template.Template;
import org.apache.tsfile.file.metadata.IDeviceID;
+import org.apache.tsfile.utils.Accountable;
import org.apache.tsfile.utils.Pair;
import java.util.List;
import java.util.Set;
-public interface ISchemaTree {
+public interface ISchemaTree extends Accountable {
/**
* Return all measurement paths for given path pattern and filter the result
by slimit and offset.
*
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/schematree/node/SchemaEntityNode.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/schematree/node/SchemaEntityNode.java
index c160a81f20f..2cb8ede8023 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/schematree/node/SchemaEntityNode.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/schematree/node/SchemaEntityNode.java
@@ -19,6 +19,9 @@
package org.apache.iotdb.db.queryengine.common.schematree.node;
+import org.apache.iotdb.db.queryengine.execution.MemoryEstimationHelper;
+
+import org.apache.tsfile.utils.RamUsageEstimator;
import org.apache.tsfile.utils.ReadWriteIOUtils;
import java.io.IOException;
@@ -31,6 +34,9 @@ import static
org.apache.iotdb.commons.schema.SchemaConstant.NON_TEMPLATE;
public class SchemaEntityNode extends SchemaInternalNode {
+ private static final long SHALLOW_SIZE =
+ RamUsageEstimator.shallowSizeOfInstance(SchemaEntityNode.class);
+
private boolean isAligned;
private Map<String, SchemaMeasurementNode> aliasChildren;
@@ -138,4 +144,18 @@ public class SchemaEntityNode extends SchemaInternalNode {
entityNode.setTemplateId(templateId);
return entityNode;
}
+
+ @Override
+ public long ramBytesUsed() {
+ return SHALLOW_SIZE
+ + RamUsageEstimator.sizeOf(name)
+ + MemoryEstimationHelper.getEstimatedSizeOfMap(
+ children,
+ MemoryEstimationHelper.SHALLOW_SIZE_OF_HASHMAP,
+ MemoryEstimationHelper.SHALLOW_SIZE_OF_HASHMAP_ENTRY)
+ + MemoryEstimationHelper.getEstimatedSizeOfMap(
+ aliasChildren,
+ MemoryEstimationHelper.SHALLOW_SIZE_OF_HASHMAP,
+ MemoryEstimationHelper.SHALLOW_SIZE_OF_HASHMAP_ENTRY);
+ }
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/schematree/node/SchemaInternalNode.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/schematree/node/SchemaInternalNode.java
index 155396a0186..2700a454d13 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/schematree/node/SchemaInternalNode.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/schematree/node/SchemaInternalNode.java
@@ -19,6 +19,9 @@
package org.apache.iotdb.db.queryengine.common.schematree.node;
+import org.apache.iotdb.db.queryengine.execution.MemoryEstimationHelper;
+
+import org.apache.tsfile.utils.RamUsageEstimator;
import org.apache.tsfile.utils.ReadWriteIOUtils;
import java.io.IOException;
@@ -30,6 +33,9 @@ import java.util.Map;
public class SchemaInternalNode extends SchemaNode {
+ private static final long SHALLOW_SIZE =
+ RamUsageEstimator.shallowSizeOfInstance(SchemaInternalNode.class);
+
protected Map<String, SchemaNode> children = new HashMap<>();
public SchemaInternalNode(String name) {
@@ -105,4 +111,14 @@ public class SchemaInternalNode extends SchemaNode {
return new SchemaInternalNode(name);
}
+
+ @Override
+ public long ramBytesUsed() {
+ return SHALLOW_SIZE
+ + RamUsageEstimator.sizeOf(name)
+ + MemoryEstimationHelper.getEstimatedSizeOfMap(
+ children,
+ MemoryEstimationHelper.SHALLOW_SIZE_OF_HASHMAP,
+ MemoryEstimationHelper.SHALLOW_SIZE_OF_HASHMAP_ENTRY);
+ }
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/schematree/node/SchemaMeasurementNode.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/schematree/node/SchemaMeasurementNode.java
index a896a57fdfd..583619539ae 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/schematree/node/SchemaMeasurementNode.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/schematree/node/SchemaMeasurementNode.java
@@ -22,6 +22,7 @@ package
org.apache.iotdb.db.queryengine.common.schematree.node;
import org.apache.iotdb.commons.schema.view.LogicalViewSchema;
import
org.apache.iotdb.db.queryengine.common.schematree.IMeasurementSchemaInfo;
+import org.apache.tsfile.utils.RamUsageEstimator;
import org.apache.tsfile.utils.ReadWriteIOUtils;
import org.apache.tsfile.write.schema.IMeasurementSchema;
import org.apache.tsfile.write.schema.MeasurementSchema;
@@ -34,6 +35,9 @@ import java.util.Map;
public class SchemaMeasurementNode extends SchemaNode implements
IMeasurementSchemaInfo {
+ private static final long SHALLOW_SIZE =
+ RamUsageEstimator.shallowSizeOfInstance(SchemaMeasurementNode.class);
+
private String alias;
private IMeasurementSchema schema;
private Map<String, String> tagMap;
@@ -44,6 +48,16 @@ public class SchemaMeasurementNode extends SchemaNode
implements IMeasurementSch
this.schema = schema;
}
+ @Override
+ public long ramBytesUsed() {
+ return SHALLOW_SIZE
+ + RamUsageEstimator.sizeOf(name)
+ + RamUsageEstimator.sizeOf(alias)
+ + schema.ramBytesUsed()
+ + RamUsageEstimator.sizeOfMap(tagMap)
+ + RamUsageEstimator.sizeOfMap(attributeMap);
+ }
+
public String getAlias() {
return alias;
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/schematree/node/SchemaNode.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/schematree/node/SchemaNode.java
index 0ca11bd0d7e..dd4e5e57a2e 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/schematree/node/SchemaNode.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/schematree/node/SchemaNode.java
@@ -21,13 +21,15 @@ package
org.apache.iotdb.db.queryengine.common.schematree.node;
import org.apache.iotdb.commons.schema.tree.ITreeNode;
+import org.apache.tsfile.utils.Accountable;
+
import java.io.IOException;
import java.io.OutputStream;
import java.util.Collections;
import java.util.Iterator;
import java.util.Map;
-public abstract class SchemaNode implements ITreeNode {
+public abstract class SchemaNode implements ITreeNode, Accountable {
public static final byte SCHEMA_INTERNAL_NODE = 0;
public static final byte SCHEMA_ENTITY_NODE = 1;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/MemoryEstimationHelper.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/MemoryEstimationHelper.java
index f7318865c2e..39a98c80a12 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/MemoryEstimationHelper.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/MemoryEstimationHelper.java
@@ -30,7 +30,10 @@ import javax.annotation.Nullable;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
public class MemoryEstimationHelper {
@@ -48,6 +51,13 @@ public class MemoryEstimationHelper {
private static final long INTEGER_INSTANCE_SIZE =
RamUsageEstimator.shallowSizeOfInstance(Integer.class);
+ public static final long SHALLOW_SIZE_OF_HASHMAP =
+ RamUsageEstimator.shallowSizeOfInstance(HashMap.class);
+ public static final long SHALLOW_SIZE_OF_HASHMAP_ENTRY = 32;
+ public static final long SHALLOW_SIZE_OF_CONCURRENT_HASHMAP =
+ RamUsageEstimator.shallowSizeOfInstance(ConcurrentHashMap.class);
+ public static final long SHALLOW_SIZE_OF_CONCURRENT_HASHMAP_ENTRY = 24;
+
private MemoryEstimationHelper() {
// hide the constructor
}
@@ -119,4 +129,18 @@ public class MemoryEstimationHelper {
size += INTEGER_INSTANCE_SIZE * integerArrayList.size();
return RamUsageEstimator.alignObjectSize(size);
}
+
+ public static long getEstimatedSizeOfMap(
+ Map<?, ? extends Accountable> map, long shallowSizeOfMap, long
shallowSizeOfMapEntry) {
+ if (map == null) {
+ return 0;
+ }
+ long result = shallowSizeOfMap;
+ for (Map.Entry<?, ? extends Accountable> entry : map.entrySet()) {
+ result += RamUsageEstimator.sizeOfObject(entry.getKey());
+ result += entry.getValue() == null ? 0 : entry.getValue().ramBytesUsed();
+ }
+ result += map.size() * shallowSizeOfMapEntry;
+ return RamUsageEstimator.alignObjectSize(result);
+ }
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/schema/SchemaFetchScanOperator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/schema/SchemaFetchScanOperator.java
index d504490c055..f7207a97c48 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/schema/SchemaFetchScanOperator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/schema/SchemaFetchScanOperator.java
@@ -28,6 +28,7 @@ import
org.apache.iotdb.db.queryengine.common.schematree.node.SchemaNode;
import org.apache.iotdb.db.queryengine.execution.MemoryEstimationHelper;
import org.apache.iotdb.db.queryengine.execution.operator.OperatorContext;
import
org.apache.iotdb.db.queryengine.execution.operator.source.SourceOperator;
+import
org.apache.iotdb.db.queryengine.plan.planner.memory.MemoryReservationManager;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId;
import org.apache.iotdb.db.schemaengine.schemaregion.ISchemaRegion;
import org.apache.iotdb.db.schemaengine.template.Template;
@@ -64,6 +65,7 @@ public class SchemaFetchScanOperator implements
SourceOperator {
private final PathPatternTree authorityScope;
private Iterator<SchemaNode> schemaNodeIteratorForSerialize = null;
+ private long schemaTreeMemCost;
// Reserve some bytes to avoid capacity expansion
private PublicBAOS baos = new PublicBAOS(DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES +
1024);
@@ -158,9 +160,14 @@ public class SchemaFetchScanOperator implements
SourceOperator {
throw new NoSuchElementException();
}
- prepareSchemaNodeIterator();
+ boolean isFirstBatch = schemaNodeIteratorForSerialize == null;
+ prepareSchemaNodeIteratorForSerialize();
// to indicate this binary data is database info
ReadWriteIOUtils.write((byte) 1, baos);
+ // the estimated mem cost to deserialize the total schema tree
+ if (isFirstBatch) {
+ ReadWriteIOUtils.write(schemaTreeMemCost, baos);
+ }
while (schemaNodeIteratorForSerialize.hasNext()
&& baos.size() < DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES) {
SchemaNode node = schemaNodeIteratorForSerialize.next();
@@ -172,7 +179,7 @@ public class SchemaFetchScanOperator implements
SourceOperator {
if (isFinished) {
// indicate all continuous binary data is finished
currentBatch[0] = 2;
- schemaNodeIteratorForSerialize = null;
+ releaseSchemaTree();
baos = null;
}
return new TsBlock(
@@ -192,7 +199,7 @@ public class SchemaFetchScanOperator implements
SourceOperator {
@Override
public void close() throws Exception {
- schemaNodeIteratorForSerialize = null;
+ releaseSchemaTree();
baos = null;
}
@@ -201,7 +208,7 @@ public class SchemaFetchScanOperator implements
SourceOperator {
return sourceId;
}
- private void prepareSchemaNodeIterator() {
+ private void prepareSchemaNodeIteratorForSerialize() {
if (schemaNodeIteratorForSerialize != null) {
return;
}
@@ -212,6 +219,10 @@ public class SchemaFetchScanOperator implements
SourceOperator {
: schemaRegion.fetchSeriesSchema(
patternTree, templateMap, withTags, withAttributes,
withTemplate, withAliasForce);
schemaNodeIteratorForSerialize = schemaTree.getIteratorForSerialize();
+ schemaTreeMemCost = schemaTree.ramBytesUsed();
+ MemoryReservationManager memoryReservationContext =
+ operatorContext.getInstanceContext().getMemoryReservationContext();
+ memoryReservationContext.reserveMemoryCumulatively(schemaTreeMemCost);
} catch (MetadataException e) {
throw new SchemaExecutionException(e);
}
@@ -238,4 +249,16 @@ public class SchemaFetchScanOperator implements
SourceOperator {
+
MemoryEstimationHelper.getEstimatedSizeOfAccountableObject(operatorContext)
+ MemoryEstimationHelper.getEstimatedSizeOfAccountableObject(sourceId);
}
+
+ private void releaseSchemaTree() {
+ if (schemaTreeMemCost <= 0) {
+ return;
+ }
+ operatorContext
+ .getInstanceContext()
+ .getMemoryReservationContext()
+ .releaseMemoryCumulatively(schemaTreeMemCost);
+ schemaTreeMemCost = 0;
+ schemaNodeIteratorForSerialize = null;
+ }
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java
index 9f43666a12b..8c1b8ec61e9 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java
@@ -523,6 +523,7 @@ public class AnalyzeVisitor extends
StatementVisitor<Analysis, MPPQueryContext>
context.setFetchSchemaCost(schemaFetchCost);
QueryPlanCostMetricSet.getInstance().recordTreePlanCost(SCHEMA_FETCHER,
schemaFetchCost);
}
+ analysis.setSchemaTree(schemaTree);
return schemaTree;
}
@@ -3552,6 +3553,7 @@ public class AnalyzeVisitor extends
StatementVisitor<Analysis, MPPQueryContext>
}
}
analysis.setSchemaTree(schemaTree);
+ context.setReleaseSchemaTreeAfterAnalyzing(false);
Map<String, List<DataPartitionQueryParam>> sgNameToQueryParamsMap = new
HashMap<>();
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/Analyzer.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/Analyzer.java
index 43cf67a283c..8641e4631c2 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/Analyzer.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/Analyzer.java
@@ -43,8 +43,18 @@ public class Analyzer {
public Analysis analyze(Statement statement) {
long startTime = System.nanoTime();
- Analysis analysis =
- new AnalyzeVisitor(partitionFetcher, schemaFetcher).process(statement,
context);
+ AnalyzeVisitor visitor = new AnalyzeVisitor(partitionFetcher,
schemaFetcher);
+ Analysis analysis = null;
+
context.setReserveMemoryForSchemaTreeFunc(context::reserveMemoryForFrontEnd);
+ try {
+ analysis = visitor.process(statement, context);
+ } finally {
+ if (analysis != null && context.releaseSchemaTreeAfterAnalyzing()) {
+ analysis.setSchemaTree(null);
+ context.releaseMemoryForSchemaTree();
+ }
+ context.setReserveMemoryForSchemaTreeFunc(null);
+ }
if (context.getSession() != null) {
// for test compatibility
analysis.setDatabaseName(context.getDatabaseName().orElse(null));
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/schema/ClusterSchemaFetchExecutor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/schema/ClusterSchemaFetchExecutor.java
index 0d0e11e4019..c0eb25a649e 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/schema/ClusterSchemaFetchExecutor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/schema/ClusterSchemaFetchExecutor.java
@@ -276,7 +276,7 @@ class ClusterSchemaFetchExecutor {
}
Column column = tsBlock.get().getColumn(0);
for (int i = 0; i < column.getPositionCount(); i++) {
- parseFetchedData(column.getBinary(i), result, deserializer,
databaseSet);
+ parseFetchedData(column.getBinary(i), result, deserializer,
databaseSet, context);
}
}
result.setDatabases(databaseSet);
@@ -294,7 +294,8 @@ class ClusterSchemaFetchExecutor {
Binary data,
ClusterSchemaTree resultSchemaTree,
ClusterSchemaTree.SchemaNodeBatchDeserializer deserializer,
- Set<String> databaseSet) {
+ Set<String> databaseSet,
+ MPPQueryContext context) {
InputStream inputStream = new ByteArrayInputStream(data.getValues());
try {
byte type = ReadWriteIOUtils.readByte(inputStream);
@@ -303,11 +304,16 @@ class ClusterSchemaFetchExecutor {
for (int i = 0; i < size; i++) {
databaseSet.add(ReadWriteIOUtils.readString(inputStream));
}
- } else if (type == 1) {
- deserializer.deserializeFromBatch(inputStream);
- } else if (type == 2) {
+ } else if (type == 1 || type == 2) {
+ if (deserializer.isFirstBatch()) {
+ long memCost = ReadWriteIOUtils.readLong(inputStream);
+ context.reserveMemoryForSchemaTree(memCost);
+ }
deserializer.deserializeFromBatch(inputStream);
- resultSchemaTree.mergeSchemaTree(deserializer.finish());
+ if (type == 2) {
+ // 'type == 2' indicates this batch is finished
+ resultSchemaTree.mergeSchemaTree(deserializer.finish());
+ }
} else {
throw new RuntimeException(
new MetadataException("Failed to fetch schema because of
unrecognized data"));
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/template/Template.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/template/Template.java
index a23fcb78541..7d0a97d9b8f 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/template/Template.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/template/Template.java
@@ -20,11 +20,14 @@
package org.apache.iotdb.db.schemaengine.template;
import org.apache.iotdb.commons.exception.IllegalPathException;
+import org.apache.iotdb.db.queryengine.execution.MemoryEstimationHelper;
import org.apache.commons.lang3.builder.HashCodeBuilder;
import org.apache.tsfile.enums.TSDataType;
import org.apache.tsfile.file.metadata.enums.CompressionType;
import org.apache.tsfile.file.metadata.enums.TSEncoding;
+import org.apache.tsfile.utils.Accountable;
+import org.apache.tsfile.utils.RamUsageEstimator;
import org.apache.tsfile.utils.ReadWriteIOUtils;
import org.apache.tsfile.write.schema.IMeasurementSchema;
import org.apache.tsfile.write.schema.MeasurementSchema;
@@ -39,8 +42,9 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
-public class Template implements Serializable {
+public class Template implements Serializable, Accountable {
+ private static final long SHALLOW_SIZE =
RamUsageEstimator.shallowSizeOfInstance(Template.class);
private int id;
private String name;
private boolean isDirectAligned;
@@ -226,4 +230,14 @@ public class Template implements Serializable {
public int hashCode() {
return new HashCodeBuilder(17,
37).append(name).append(schemaMap).toHashCode();
}
+
+ @Override
+ public long ramBytesUsed() {
+ return SHALLOW_SIZE
+ + RamUsageEstimator.sizeOf(name)
+ + MemoryEstimationHelper.getEstimatedSizeOfMap(
+ schemaMap,
+ MemoryEstimationHelper.SHALLOW_SIZE_OF_CONCURRENT_HASHMAP,
+ MemoryEstimationHelper.SHALLOW_SIZE_OF_CONCURRENT_HASHMAP_ENTRY);
+ }
}