This is an automated email from the ASF dual-hosted git repository.

shuwenwei pushed a commit to branch transferSchemaTreeInBatches
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit da522b91201c06e2297a2f9ae5dff8d92fc7712f
Author: shuwenwei <[email protected]>
AuthorDate: Thu Jul 24 11:05:33 2025 +0800

    Transfer schema tree in batches
---
 .../common/schematree/ClusterSchemaTree.java       | 140 +++++++++++++++------
 .../common/schematree/node/SchemaEntityNode.java   |   5 +
 .../common/schematree/node/SchemaInternalNode.java |   3 +
 .../schematree/node/SchemaMeasurementNode.java     |   4 +
 .../common/schematree/node/SchemaNode.java         |   2 +
 .../operator/schema/SchemaFetchScanOperator.java   |  67 ++++++----
 .../queryengine/plan/analyze/AnalyzeVisitor.java   |   2 -
 .../analyze/schema/ClusterSchemaFetchExecutor.java |  14 ++-
 8 files changed, 170 insertions(+), 67 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/schematree/ClusterSchemaTree.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/schematree/ClusterSchemaTree.java
index 91781dcca0c..6e9806a4ee4 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/schematree/ClusterSchemaTree.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/schematree/ClusterSchemaTree.java
@@ -50,8 +50,10 @@ import java.util.ArrayDeque;
 import java.util.ArrayList;
 import java.util.Deque;
 import java.util.HashMap;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.NoSuchElementException;
 import java.util.Set;
 
 import static org.apache.iotdb.commons.conf.IoTDBConstant.PATH_ROOT;
@@ -485,8 +487,53 @@ public class ClusterSchemaTree implements ISchemaTree {
     root.serialize(outputStream);
   }
 
-  public static ClusterSchemaTree deserialize(InputStream inputStream) throws 
IOException {
+  public Iterator<SchemaNode> getIteratorForSerialize() {
+    return new SchemaNodePostOrderIterator(root);
+  }
+
+  private static class SchemaNodePostOrderIterator implements 
Iterator<SchemaNode> {
+    private final Deque<Pair<SchemaNode, Iterator<SchemaNode>>> stack = new 
ArrayDeque<>();
+    private SchemaNode nextNode;
+
+    public SchemaNodePostOrderIterator(SchemaNode root) {
+      stack.push(new Pair<>(root, root.getChildrenIterator()));
+      prepareNext();
+    }
+
+    @Override
+    public boolean hasNext() {
+      return nextNode != null;
+    }
+
+    @Override
+    public SchemaNode next() {
+      if (!hasNext()) {
+        throw new NoSuchElementException();
+      }
+      SchemaNode result = nextNode;
+      prepareNext();
+      return result;
+    }
+
+    private void prepareNext() {
+      nextNode = null;
+      while (!stack.isEmpty()) {
+        Pair<SchemaNode, Iterator<SchemaNode>> pair = stack.peek();
+        SchemaNode currentNode = pair.getLeft();
+        Iterator<SchemaNode> childrenIterator = pair.getRight();
+        if (childrenIterator.hasNext()) {
+          SchemaNode child = childrenIterator.next();
+          stack.push(new Pair<>(child, child.getChildrenIterator()));
+        } else {
+          stack.pop();
+          nextNode = currentNode;
+          return;
+        }
+      }
+    }
+  }
 
+  public static class SchemaNodeBatchDeserializer {
     byte nodeType;
     int childNum;
     Deque<SchemaNode> stack = new ArrayDeque<>();
@@ -495,49 +542,70 @@ public class ClusterSchemaTree implements ISchemaTree {
     boolean hasNormalTimeSeries = false;
     Map<Integer, Template> templateMap = new HashMap<>();
 
-    while (inputStream.available() > 0) {
-      nodeType = ReadWriteIOUtils.readByte(inputStream);
-      if (nodeType == SCHEMA_MEASUREMENT_NODE) {
-        SchemaMeasurementNode measurementNode = 
SchemaMeasurementNode.deserialize(inputStream);
-        stack.push(measurementNode);
-        if (measurementNode.isLogicalView()) {
-          hasLogicalView = true;
-        }
-        hasNormalTimeSeries = true;
-      } else {
-        SchemaInternalNode internalNode;
-        if (nodeType == SCHEMA_ENTITY_NODE) {
-          internalNode = SchemaEntityNode.deserialize(inputStream);
-          int templateId = internalNode.getAsEntityNode().getTemplateId();
-          if (templateId != NON_TEMPLATE) {
-            templateMap.putIfAbsent(templateId, 
templateManager.getTemplate(templateId));
+    public void deserializeFromBatch(InputStream inputStream) throws 
IOException {
+      while (inputStream.available() > 0) {
+        nodeType = ReadWriteIOUtils.readByte(inputStream);
+        if (nodeType == SCHEMA_MEASUREMENT_NODE) {
+          SchemaMeasurementNode measurementNode = 
SchemaMeasurementNode.deserialize(inputStream);
+          stack.push(measurementNode);
+          if (measurementNode.isLogicalView()) {
+            hasLogicalView = true;
           }
+          hasNormalTimeSeries = true;
         } else {
-          internalNode = SchemaInternalNode.deserialize(inputStream);
-        }
+          SchemaInternalNode internalNode;
+          if (nodeType == SCHEMA_ENTITY_NODE) {
+            internalNode = SchemaEntityNode.deserialize(inputStream);
+            int templateId = internalNode.getAsEntityNode().getTemplateId();
+            if (templateId != NON_TEMPLATE) {
+              templateMap.putIfAbsent(templateId, 
templateManager.getTemplate(templateId));
+            }
+          } else {
+            internalNode = SchemaInternalNode.deserialize(inputStream);
+          }
 
-        childNum = ReadWriteIOUtils.readInt(inputStream);
-        while (childNum > 0) {
-          child = stack.pop();
-          internalNode.addChild(child.getName(), child);
-          if (child.isMeasurement()) {
-            SchemaMeasurementNode measurementNode = 
child.getAsMeasurementNode();
-            if (measurementNode.getAlias() != null) {
-              internalNode
-                  .getAsEntityNode()
-                  .addAliasChild(measurementNode.getAlias(), measurementNode);
+          childNum = ReadWriteIOUtils.readInt(inputStream);
+          while (childNum > 0) {
+            child = stack.pop();
+            internalNode.addChild(child.getName(), child);
+            if (child.isMeasurement()) {
+              SchemaMeasurementNode measurementNode = 
child.getAsMeasurementNode();
+              if (measurementNode.getAlias() != null) {
+                internalNode
+                    .getAsEntityNode()
+                    .addAliasChild(measurementNode.getAlias(), 
measurementNode);
+              }
             }
+            childNum--;
           }
-          childNum--;
+          stack.push(internalNode);
         }
-        stack.push(internalNode);
       }
     }
-    ClusterSchemaTree result = new ClusterSchemaTree(stack.poll());
-    result.templateMap = templateMap;
-    result.hasLogicalMeasurementPath = hasLogicalView;
-    result.hasNormalTimeSeries = hasNormalTimeSeries;
-    return result;
+
+    public ClusterSchemaTree finish() {
+      try {
+        ClusterSchemaTree result = new ClusterSchemaTree(stack.poll());
+        result.templateMap = templateMap;
+        result.hasLogicalMeasurementPath = hasLogicalView;
+        result.hasNormalTimeSeries = hasNormalTimeSeries;
+        return result;
+      } finally {
+        nodeType = 0;
+        childNum = 0;
+        stack.clear();
+        child = null;
+        hasLogicalView = false;
+        hasNormalTimeSeries = false;
+        templateMap = new HashMap<>();
+      }
+    }
+  }
+
+  public static ClusterSchemaTree deserialize(InputStream inputStream) throws 
IOException {
+    SchemaNodeBatchDeserializer schemaNodeBatchDeserializer = new 
SchemaNodeBatchDeserializer();
+    schemaNodeBatchDeserializer.deserializeFromBatch(inputStream);
+    return schemaNodeBatchDeserializer.finish();
   }
 
   /**
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/schematree/node/SchemaEntityNode.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/schematree/node/SchemaEntityNode.java
index b877dc9d7a2..c160a81f20f 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/schematree/node/SchemaEntityNode.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/schematree/node/SchemaEntityNode.java
@@ -117,6 +117,11 @@ public class SchemaEntityNode extends SchemaInternalNode {
   @Override
   public void serialize(OutputStream outputStream) throws IOException {
     serializeChildren(outputStream);
+    this.serializeNodeOwnContent(outputStream);
+  }
+
+  @Override
+  public void serializeNodeOwnContent(OutputStream outputStream) throws 
IOException {
     ReadWriteIOUtils.write(getType(), outputStream);
     ReadWriteIOUtils.write(name, outputStream);
     ReadWriteIOUtils.write(isAligned, outputStream);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/schematree/node/SchemaInternalNode.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/schematree/node/SchemaInternalNode.java
index 5c6de241baf..155396a0186 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/schematree/node/SchemaInternalNode.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/schematree/node/SchemaInternalNode.java
@@ -85,7 +85,10 @@ public class SchemaInternalNode extends SchemaNode {
 
   public void serialize(OutputStream outputStream) throws IOException {
     serializeChildren(outputStream);
+    serializeNodeOwnContent(outputStream);
+  }
 
+  public void serializeNodeOwnContent(OutputStream outputStream) throws 
IOException {
     ReadWriteIOUtils.write(getType(), outputStream);
     ReadWriteIOUtils.write(name, outputStream);
     ReadWriteIOUtils.write(children.size(), outputStream);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/schematree/node/SchemaMeasurementNode.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/schematree/node/SchemaMeasurementNode.java
index b1eaa7ff5c1..a896a57fdfd 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/schematree/node/SchemaMeasurementNode.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/schematree/node/SchemaMeasurementNode.java
@@ -143,6 +143,10 @@ public class SchemaMeasurementNode extends SchemaNode 
implements IMeasurementSch
 
   @Override
   public void serialize(OutputStream outputStream) throws IOException {
+    serializeNodeOwnContent(outputStream);
+  }
+
+  public void serializeNodeOwnContent(OutputStream outputStream) throws 
IOException {
     ReadWriteIOUtils.write(getType(), outputStream);
     ReadWriteIOUtils.write(name, outputStream);
     ReadWriteIOUtils.write(alias, outputStream);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/schematree/node/SchemaNode.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/schematree/node/SchemaNode.java
index e2625cd97ac..0ca11bd0d7e 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/schematree/node/SchemaNode.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/schematree/node/SchemaNode.java
@@ -80,4 +80,6 @@ public abstract class SchemaNode implements ITreeNode {
   public abstract byte getType();
 
   public abstract void serialize(OutputStream outputStream) throws IOException;
+
+  public abstract void serializeNodeOwnContent(OutputStream outputStream) 
throws IOException;
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/schema/SchemaFetchScanOperator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/schema/SchemaFetchScanOperator.java
index f4b40c69b79..d504490c055 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/schema/SchemaFetchScanOperator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/schema/SchemaFetchScanOperator.java
@@ -24,6 +24,7 @@ import 
org.apache.iotdb.commons.exception.runtime.SchemaExecutionException;
 import org.apache.iotdb.commons.path.PathPatternTree;
 import org.apache.iotdb.commons.schema.SchemaConstant;
 import org.apache.iotdb.db.queryengine.common.schematree.ClusterSchemaTree;
+import org.apache.iotdb.db.queryengine.common.schematree.node.SchemaNode;
 import org.apache.iotdb.db.queryengine.execution.MemoryEstimationHelper;
 import org.apache.iotdb.db.queryengine.execution.operator.OperatorContext;
 import 
org.apache.iotdb.db.queryengine.execution.operator.source.SourceOperator;
@@ -36,12 +37,12 @@ import org.apache.tsfile.read.common.block.TsBlock;
 import org.apache.tsfile.read.common.block.column.BinaryColumn;
 import org.apache.tsfile.read.common.block.column.TimeColumn;
 import org.apache.tsfile.utils.Binary;
+import org.apache.tsfile.utils.PublicBAOS;
 import org.apache.tsfile.utils.RamUsageEstimator;
 import org.apache.tsfile.utils.ReadWriteIOUtils;
 
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
 import java.util.Collections;
+import java.util.Iterator;
 import java.util.Map;
 import java.util.NoSuchElementException;
 import java.util.Optional;
@@ -62,6 +63,10 @@ public class SchemaFetchScanOperator implements 
SourceOperator {
   private boolean isFinished = false;
   private final PathPatternTree authorityScope;
 
+  private Iterator<SchemaNode> schemaNodeIteratorForSerialize = null;
+  // Reserve some bytes to avoid capacity expansion
+  private PublicBAOS baos = new PublicBAOS(DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES + 
1024);
+
   private static final int DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES =
       TSFileDescriptor.getInstance().getConfig().getMaxTsBlockSizeInBytes();
 
@@ -152,12 +157,27 @@ public class SchemaFetchScanOperator implements 
SourceOperator {
     if (!hasNext()) {
       throw new NoSuchElementException();
     }
-    isFinished = true;
-    try {
-      return fetchSchema();
-    } catch (MetadataException e) {
-      throw new SchemaExecutionException(e);
+
+    prepareSchemaNodeIterator();
+    // to indicate this binary data is database info
+    ReadWriteIOUtils.write((byte) 1, baos);
+    while (schemaNodeIteratorForSerialize.hasNext()
+        && baos.size() < DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES) {
+      SchemaNode node = schemaNodeIteratorForSerialize.next();
+      node.serializeNodeOwnContent(baos);
+    }
+    byte[] currentBatch = baos.toByteArray();
+    baos.reset();
+    isFinished = !schemaNodeIteratorForSerialize.hasNext();
+    if (isFinished) {
+      // indicate all continuous binary data is finished
+      currentBatch[0] = 2;
+      schemaNodeIteratorForSerialize = null;
+      baos = null;
     }
+    return new TsBlock(
+        new TimeColumn(1, new long[] {0}),
+        new BinaryColumn(1, Optional.empty(), new Binary[] {new 
Binary(currentBatch)}));
   }
 
   @Override
@@ -172,7 +192,8 @@ public class SchemaFetchScanOperator implements 
SourceOperator {
 
   @Override
   public void close() throws Exception {
-    // do nothing
+    schemaNodeIteratorForSerialize = null;
+    baos = null;
   }
 
   @Override
@@ -180,26 +201,20 @@ public class SchemaFetchScanOperator implements 
SourceOperator {
     return sourceId;
   }
 
-  private TsBlock fetchSchema() throws MetadataException {
-    ClusterSchemaTree schemaTree =
-        fetchDevice
-            ? schemaRegion.fetchDeviceSchema(patternTree, authorityScope)
-            : schemaRegion.fetchSeriesSchema(
-                patternTree, templateMap, withTags, withAttributes, 
withTemplate, withAliasForce);
-
-    ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
+  private void prepareSchemaNodeIterator() {
+    if (schemaNodeIteratorForSerialize != null) {
+      return;
+    }
     try {
-      // to indicate this binary data is database info
-      ReadWriteIOUtils.write((byte) 1, outputStream);
-
-      schemaTree.serialize(outputStream);
-    } catch (IOException e) {
-      // Totally memory operation. This case won't happen.
+      ClusterSchemaTree schemaTree =
+          fetchDevice
+              ? schemaRegion.fetchDeviceSchema(patternTree, authorityScope)
+              : schemaRegion.fetchSeriesSchema(
+                  patternTree, templateMap, withTags, withAttributes, 
withTemplate, withAliasForce);
+      schemaNodeIteratorForSerialize = schemaTree.getIteratorForSerialize();
+    } catch (MetadataException e) {
+      throw new SchemaExecutionException(e);
     }
-    return new TsBlock(
-        new TimeColumn(1, new long[] {0}),
-        new BinaryColumn(
-            1, Optional.empty(), new Binary[] {new 
Binary(outputStream.toByteArray())}));
   }
 
   @Override
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java
index 6009b07011e..9f43666a12b 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java
@@ -523,8 +523,6 @@ public class AnalyzeVisitor extends 
StatementVisitor<Analysis, MPPQueryContext>
       context.setFetchSchemaCost(schemaFetchCost);
       QueryPlanCostMetricSet.getInstance().recordTreePlanCost(SCHEMA_FETCHER, 
schemaFetchCost);
     }
-
-    analysis.setSchemaTree(schemaTree);
     return schemaTree;
   }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/schema/ClusterSchemaFetchExecutor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/schema/ClusterSchemaFetchExecutor.java
index 2153a0ccf4b..0d0e11e4019 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/schema/ClusterSchemaFetchExecutor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/schema/ClusterSchemaFetchExecutor.java
@@ -257,6 +257,8 @@ class ClusterSchemaFetchExecutor {
       }
       try (SetThreadName ignored = new 
SetThreadName(executionResult.queryId.getId())) {
         ClusterSchemaTree result = new ClusterSchemaTree();
+        ClusterSchemaTree.SchemaNodeBatchDeserializer deserializer =
+            new ClusterSchemaTree.SchemaNodeBatchDeserializer();
         Set<String> databaseSet = new HashSet<>();
         while (coordinator.getQueryExecution(queryId).hasNextResult()) {
           // The query will be transited to FINISHED when invoking 
getBatchResult() at the last time
@@ -274,7 +276,7 @@ class ClusterSchemaFetchExecutor {
           }
           Column column = tsBlock.get().getColumn(0);
           for (int i = 0; i < column.getPositionCount(); i++) {
-            parseFetchedData(column.getBinary(i), result, databaseSet);
+            parseFetchedData(column.getBinary(i), result, deserializer, 
databaseSet);
           }
         }
         result.setDatabases(databaseSet);
@@ -289,7 +291,10 @@ class ClusterSchemaFetchExecutor {
   }
 
   private void parseFetchedData(
-      Binary data, ClusterSchemaTree resultSchemaTree, Set<String> 
databaseSet) {
+      Binary data,
+      ClusterSchemaTree resultSchemaTree,
+      ClusterSchemaTree.SchemaNodeBatchDeserializer deserializer,
+      Set<String> databaseSet) {
     InputStream inputStream = new ByteArrayInputStream(data.getValues());
     try {
       byte type = ReadWriteIOUtils.readByte(inputStream);
@@ -299,7 +304,10 @@ class ClusterSchemaFetchExecutor {
           databaseSet.add(ReadWriteIOUtils.readString(inputStream));
         }
       } else if (type == 1) {
-        
resultSchemaTree.mergeSchemaTree(ClusterSchemaTree.deserialize(inputStream));
+        deserializer.deserializeFromBatch(inputStream);
+      } else if (type == 2) {
+        deserializer.deserializeFromBatch(inputStream);
+        resultSchemaTree.mergeSchemaTree(deserializer.finish());
       } else {
         throw new RuntimeException(
             new MetadataException("Failed to fetch schema because of 
unrecognized data"));

Reply via email to