This is an automated email from the ASF dual-hosted git repository. shuwenwei pushed a commit to branch transferSchemaTreeInBatches in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit da522b91201c06e2297a2f9ae5dff8d92fc7712f Author: shuwenwei <[email protected]> AuthorDate: Thu Jul 24 11:05:33 2025 +0800 Transfer schema tree in batches --- .../common/schematree/ClusterSchemaTree.java | 140 +++++++++++++++------ .../common/schematree/node/SchemaEntityNode.java | 5 + .../common/schematree/node/SchemaInternalNode.java | 3 + .../schematree/node/SchemaMeasurementNode.java | 4 + .../common/schematree/node/SchemaNode.java | 2 + .../operator/schema/SchemaFetchScanOperator.java | 67 ++++++---- .../queryengine/plan/analyze/AnalyzeVisitor.java | 2 - .../analyze/schema/ClusterSchemaFetchExecutor.java | 14 ++- 8 files changed, 170 insertions(+), 67 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/schematree/ClusterSchemaTree.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/schematree/ClusterSchemaTree.java index 91781dcca0c..6e9806a4ee4 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/schematree/ClusterSchemaTree.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/schematree/ClusterSchemaTree.java @@ -50,8 +50,10 @@ import java.util.ArrayDeque; import java.util.ArrayList; import java.util.Deque; import java.util.HashMap; +import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.NoSuchElementException; import java.util.Set; import static org.apache.iotdb.commons.conf.IoTDBConstant.PATH_ROOT; @@ -485,8 +487,53 @@ public class ClusterSchemaTree implements ISchemaTree { root.serialize(outputStream); } - public static ClusterSchemaTree deserialize(InputStream inputStream) throws IOException { + public Iterator<SchemaNode> getIteratorForSerialize() { + return new SchemaNodePostOrderIterator(root); + } + + private static class SchemaNodePostOrderIterator implements Iterator<SchemaNode> { + private final Deque<Pair<SchemaNode, Iterator<SchemaNode>>> stack = new ArrayDeque<>(); + private SchemaNode nextNode; + + public SchemaNodePostOrderIterator(SchemaNode root) { + stack.push(new Pair<>(root, root.getChildrenIterator())); + prepareNext(); + } + + @Override + public boolean hasNext() { + return nextNode != null; + } + + @Override + public SchemaNode next() { + if (!hasNext()) { + throw new NoSuchElementException(); + } + SchemaNode result = nextNode; + prepareNext(); + return result; + } + + private void prepareNext() { + nextNode = null; + while (!stack.isEmpty()) { + Pair<SchemaNode, Iterator<SchemaNode>> pair = stack.peek(); + SchemaNode currentNode = pair.getLeft(); + Iterator<SchemaNode> childrenIterator = pair.getRight(); + if (childrenIterator.hasNext()) { + SchemaNode child = childrenIterator.next(); + stack.push(new Pair<>(child, child.getChildrenIterator())); + } else { + stack.pop(); + nextNode = currentNode; + return; + } + } + } + } + public static class SchemaNodeBatchDeserializer { byte nodeType; int childNum; Deque<SchemaNode> stack = new ArrayDeque<>(); @@ -495,49 +542,70 @@ public class ClusterSchemaTree implements ISchemaTree { boolean hasNormalTimeSeries = false; Map<Integer, Template> templateMap = new HashMap<>(); - while (inputStream.available() > 0) { - nodeType = ReadWriteIOUtils.readByte(inputStream); - if (nodeType == SCHEMA_MEASUREMENT_NODE) { - SchemaMeasurementNode measurementNode = SchemaMeasurementNode.deserialize(inputStream); - stack.push(measurementNode); - if (measurementNode.isLogicalView()) { - hasLogicalView = true; - } - hasNormalTimeSeries = true; - } else { - SchemaInternalNode internalNode; - if (nodeType == SCHEMA_ENTITY_NODE) { - internalNode = SchemaEntityNode.deserialize(inputStream); - int templateId = internalNode.getAsEntityNode().getTemplateId(); - if (templateId != NON_TEMPLATE) { - templateMap.putIfAbsent(templateId, templateManager.getTemplate(templateId)); + public void deserializeFromBatch(InputStream inputStream) throws IOException { + while (inputStream.available() > 0) { + nodeType = ReadWriteIOUtils.readByte(inputStream); + if (nodeType == SCHEMA_MEASUREMENT_NODE) { + SchemaMeasurementNode measurementNode = SchemaMeasurementNode.deserialize(inputStream); + stack.push(measurementNode); + if (measurementNode.isLogicalView()) { + hasLogicalView = true; } + hasNormalTimeSeries = true; } else { - internalNode = SchemaInternalNode.deserialize(inputStream); - } + SchemaInternalNode internalNode; + if (nodeType == SCHEMA_ENTITY_NODE) { + internalNode = SchemaEntityNode.deserialize(inputStream); + int templateId = internalNode.getAsEntityNode().getTemplateId(); + if (templateId != NON_TEMPLATE) { + templateMap.putIfAbsent(templateId, templateManager.getTemplate(templateId)); + } + } else { + internalNode = SchemaInternalNode.deserialize(inputStream); + } - childNum = ReadWriteIOUtils.readInt(inputStream); - while (childNum > 0) { - child = stack.pop(); - internalNode.addChild(child.getName(), child); - if (child.isMeasurement()) { - SchemaMeasurementNode measurementNode = child.getAsMeasurementNode(); - if (measurementNode.getAlias() != null) { - internalNode - .getAsEntityNode() - .addAliasChild(measurementNode.getAlias(), measurementNode); + childNum = ReadWriteIOUtils.readInt(inputStream); + while (childNum > 0) { + child = stack.pop(); + internalNode.addChild(child.getName(), child); + if (child.isMeasurement()) { + SchemaMeasurementNode measurementNode = child.getAsMeasurementNode(); + if (measurementNode.getAlias() != null) { + internalNode + .getAsEntityNode() + .addAliasChild(measurementNode.getAlias(), measurementNode); + } } + childNum--; } - childNum--; + stack.push(internalNode); } - stack.push(internalNode); } } - ClusterSchemaTree result = new ClusterSchemaTree(stack.poll()); - result.templateMap = templateMap; - result.hasLogicalMeasurementPath = hasLogicalView; - result.hasNormalTimeSeries = hasNormalTimeSeries; - return result; + + public ClusterSchemaTree finish() { + try { + ClusterSchemaTree result = new ClusterSchemaTree(stack.poll()); + result.templateMap = templateMap; + result.hasLogicalMeasurementPath = hasLogicalView; + result.hasNormalTimeSeries = hasNormalTimeSeries; + return result; + } finally { + nodeType = 0; + childNum = 0; + stack.clear(); + child = null; + hasLogicalView = false; + hasNormalTimeSeries = false; + templateMap = new HashMap<>(); + } + } + } + + public static ClusterSchemaTree deserialize(InputStream inputStream) throws IOException { + SchemaNodeBatchDeserializer schemaNodeBatchDeserializer = new SchemaNodeBatchDeserializer(); + schemaNodeBatchDeserializer.deserializeFromBatch(inputStream); + return schemaNodeBatchDeserializer.finish(); } /** diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/schematree/node/SchemaEntityNode.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/schematree/node/SchemaEntityNode.java index b877dc9d7a2..c160a81f20f 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/schematree/node/SchemaEntityNode.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/schematree/node/SchemaEntityNode.java @@ -117,6 +117,11 @@ public class SchemaEntityNode extends SchemaInternalNode { @Override public void serialize(OutputStream outputStream) throws IOException { serializeChildren(outputStream); + this.serializeNodeOwnContent(outputStream); + } + + @Override + public void serializeNodeOwnContent(OutputStream outputStream) throws IOException { ReadWriteIOUtils.write(getType(), outputStream); ReadWriteIOUtils.write(name, outputStream); ReadWriteIOUtils.write(isAligned, outputStream); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/schematree/node/SchemaInternalNode.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/schematree/node/SchemaInternalNode.java index 5c6de241baf..155396a0186 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/schematree/node/SchemaInternalNode.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/schematree/node/SchemaInternalNode.java @@ -85,7 +85,10 @@ public class SchemaInternalNode extends SchemaNode { public void serialize(OutputStream outputStream) throws IOException { serializeChildren(outputStream); + serializeNodeOwnContent(outputStream); + } + public void serializeNodeOwnContent(OutputStream outputStream) throws IOException { ReadWriteIOUtils.write(getType(), outputStream); ReadWriteIOUtils.write(name, outputStream); ReadWriteIOUtils.write(children.size(), outputStream); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/schematree/node/SchemaMeasurementNode.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/schematree/node/SchemaMeasurementNode.java index b1eaa7ff5c1..a896a57fdfd 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/schematree/node/SchemaMeasurementNode.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/schematree/node/SchemaMeasurementNode.java @@ -143,6 +143,10 @@ public class SchemaMeasurementNode extends SchemaNode implements IMeasurementSch @Override public void serialize(OutputStream outputStream) throws IOException { + serializeNodeOwnContent(outputStream); + } + + public void serializeNodeOwnContent(OutputStream outputStream) throws IOException { ReadWriteIOUtils.write(getType(), outputStream); ReadWriteIOUtils.write(name, outputStream); ReadWriteIOUtils.write(alias, outputStream); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/schematree/node/SchemaNode.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/schematree/node/SchemaNode.java index e2625cd97ac..0ca11bd0d7e 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/schematree/node/SchemaNode.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/schematree/node/SchemaNode.java @@ -80,4 +80,6 @@ public abstract class SchemaNode implements ITreeNode { public abstract byte getType(); public abstract void serialize(OutputStream outputStream) throws IOException; + + public abstract void serializeNodeOwnContent(OutputStream outputStream) throws IOException; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/schema/SchemaFetchScanOperator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/schema/SchemaFetchScanOperator.java index f4b40c69b79..d504490c055 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/schema/SchemaFetchScanOperator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/schema/SchemaFetchScanOperator.java @@ -24,6 +24,7 @@ import org.apache.iotdb.commons.exception.runtime.SchemaExecutionException; import org.apache.iotdb.commons.path.PathPatternTree; import org.apache.iotdb.commons.schema.SchemaConstant; import org.apache.iotdb.db.queryengine.common.schematree.ClusterSchemaTree; +import org.apache.iotdb.db.queryengine.common.schematree.node.SchemaNode; import org.apache.iotdb.db.queryengine.execution.MemoryEstimationHelper; import org.apache.iotdb.db.queryengine.execution.operator.OperatorContext; import org.apache.iotdb.db.queryengine.execution.operator.source.SourceOperator; @@ -36,12 +37,12 @@ import org.apache.tsfile.read.common.block.TsBlock; import org.apache.tsfile.read.common.block.column.BinaryColumn; import org.apache.tsfile.read.common.block.column.TimeColumn; import org.apache.tsfile.utils.Binary; +import org.apache.tsfile.utils.PublicBAOS; import org.apache.tsfile.utils.RamUsageEstimator; import org.apache.tsfile.utils.ReadWriteIOUtils; -import java.io.ByteArrayOutputStream; -import java.io.IOException; import java.util.Collections; +import java.util.Iterator; import java.util.Map; import java.util.NoSuchElementException; import java.util.Optional; @@ -62,6 +63,10 @@ public class SchemaFetchScanOperator implements SourceOperator { private boolean isFinished = false; private final PathPatternTree authorityScope; + private Iterator<SchemaNode> schemaNodeIteratorForSerialize = null; + // Reserve some bytes to avoid capacity expansion + private PublicBAOS baos = new PublicBAOS(DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES + 1024); + private static final int DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES = TSFileDescriptor.getInstance().getConfig().getMaxTsBlockSizeInBytes(); @@ -152,12 +157,27 @@ public class SchemaFetchScanOperator implements SourceOperator { if (!hasNext()) { throw new NoSuchElementException(); } - isFinished = true; - try { - return fetchSchema(); - } catch (MetadataException e) { - throw new SchemaExecutionException(e); + + prepareSchemaNodeIterator(); + // to indicate this binary data is database info + ReadWriteIOUtils.write((byte) 1, baos); + while (schemaNodeIteratorForSerialize.hasNext() + && baos.size() < DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES) { + SchemaNode node = schemaNodeIteratorForSerialize.next(); + node.serializeNodeOwnContent(baos); + } + byte[] currentBatch = baos.toByteArray(); + baos.reset(); + isFinished = !schemaNodeIteratorForSerialize.hasNext(); + if (isFinished) { + // indicate all continuous binary data is finished + currentBatch[0] = 2; + schemaNodeIteratorForSerialize = null; + baos = null; } + return new TsBlock( + new TimeColumn(1, new long[] {0}), + new BinaryColumn(1, Optional.empty(), new Binary[] {new Binary(currentBatch)})); } @Override @@ -172,7 +192,8 @@ public class SchemaFetchScanOperator implements SourceOperator { @Override public void close() throws Exception { - // do nothing + schemaNodeIteratorForSerialize = null; + baos = null; } @Override @@ -180,26 +201,20 @@ public class SchemaFetchScanOperator implements SourceOperator { return sourceId; } - private TsBlock fetchSchema() throws MetadataException { - ClusterSchemaTree schemaTree = - fetchDevice - ? schemaRegion.fetchDeviceSchema(patternTree, authorityScope) - : schemaRegion.fetchSeriesSchema( - patternTree, templateMap, withTags, withAttributes, withTemplate, withAliasForce); - - ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); + private void prepareSchemaNodeIterator() { + if (schemaNodeIteratorForSerialize != null) { + return; + } try { - // to indicate this binary data is database info - ReadWriteIOUtils.write((byte) 1, outputStream); - - schemaTree.serialize(outputStream); - } catch (IOException e) { - // Totally memory operation. This case won't happen. + ClusterSchemaTree schemaTree = + fetchDevice + ? schemaRegion.fetchDeviceSchema(patternTree, authorityScope) + : schemaRegion.fetchSeriesSchema( + patternTree, templateMap, withTags, withAttributes, withTemplate, withAliasForce); + schemaNodeIteratorForSerialize = schemaTree.getIteratorForSerialize(); + } catch (MetadataException e) { + throw new SchemaExecutionException(e); } - return new TsBlock( - new TimeColumn(1, new long[] {0}), - new BinaryColumn( - 1, Optional.empty(), new Binary[] {new Binary(outputStream.toByteArray())})); } @Override diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java index 6009b07011e..9f43666a12b 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java @@ -523,8 +523,6 @@ public class AnalyzeVisitor extends StatementVisitor<Analysis, MPPQueryContext> context.setFetchSchemaCost(schemaFetchCost); QueryPlanCostMetricSet.getInstance().recordTreePlanCost(SCHEMA_FETCHER, schemaFetchCost); } - - analysis.setSchemaTree(schemaTree); return schemaTree; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/schema/ClusterSchemaFetchExecutor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/schema/ClusterSchemaFetchExecutor.java index 2153a0ccf4b..0d0e11e4019 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/schema/ClusterSchemaFetchExecutor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/schema/ClusterSchemaFetchExecutor.java @@ -257,6 +257,8 @@ class ClusterSchemaFetchExecutor { } try (SetThreadName ignored = new SetThreadName(executionResult.queryId.getId())) { ClusterSchemaTree result = new ClusterSchemaTree(); + ClusterSchemaTree.SchemaNodeBatchDeserializer deserializer = + new ClusterSchemaTree.SchemaNodeBatchDeserializer(); Set<String> databaseSet = new HashSet<>(); while (coordinator.getQueryExecution(queryId).hasNextResult()) { // The query will be transited to FINISHED when invoking getBatchResult() at the last time @@ -274,7 +276,7 @@ class ClusterSchemaFetchExecutor { } Column column = tsBlock.get().getColumn(0); for (int i = 0; i < column.getPositionCount(); i++) { - parseFetchedData(column.getBinary(i), result, databaseSet); + parseFetchedData(column.getBinary(i), result, deserializer, databaseSet); } } result.setDatabases(databaseSet); @@ -289,7 +291,10 @@ class ClusterSchemaFetchExecutor { } private void parseFetchedData( - Binary data, ClusterSchemaTree resultSchemaTree, Set<String> databaseSet) { + Binary data, + ClusterSchemaTree resultSchemaTree, + ClusterSchemaTree.SchemaNodeBatchDeserializer deserializer, + Set<String> databaseSet) { InputStream inputStream = new ByteArrayInputStream(data.getValues()); try { byte type = ReadWriteIOUtils.readByte(inputStream); @@ -299,7 +304,10 @@ class ClusterSchemaFetchExecutor { databaseSet.add(ReadWriteIOUtils.readString(inputStream)); } } else if (type == 1) { - resultSchemaTree.mergeSchemaTree(ClusterSchemaTree.deserialize(inputStream)); + deserializer.deserializeFromBatch(inputStream); + } else if (type == 2) { + deserializer.deserializeFromBatch(inputStream); + resultSchemaTree.mergeSchemaTree(deserializer.finish()); } else { throw new RuntimeException( new MetadataException("Failed to fetch schema because of unrecognized data"));
