This is an automated email from the ASF dual-hosted git repository.
zyk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 0e1dd1b8a2 Refine the inner structure of ClusterSchemaFetcher (#8603)
0e1dd1b8a2 is described below
commit 0e1dd1b8a20e5b93a36effce5f46d60be89f16b7
Author: Marcos_Zyk <[email protected]>
AuthorDate: Sat Dec 24 14:36:14 2022 +0800
Refine the inner structure of ClusterSchemaFetcher (#8603)
Refine the inner structure of ClusterSchemaFetcher (#8603)
---
.../iotdb/db/client/DataNodeInternalClient.java | 4 +-
.../db/metadata/cache/DataNodeSchemaCache.java | 4 -
.../execution/executor/RegionWriteExecutor.java | 2 +-
.../org/apache/iotdb/db/mpp/plan/Coordinator.java | 2 +-
.../iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java | 2 +
.../apache/iotdb/db/mpp/plan/analyze/Analyzer.java | 2 +
.../db/mpp/plan/analyze/ClusterSchemaFetcher.java | 599 ---------------------
.../analyze/schema/AutoCreateSchemaExecutor.java | 230 ++++++++
.../analyze/schema/ClusterSchemaFetchExecutor.java | 207 +++++++
.../plan/analyze/schema/ClusterSchemaFetcher.java | 354 ++++++++++++
.../plan/analyze/{ => schema}/ISchemaFetcher.java | 2 +-
.../plan/analyze/{ => schema}/SchemaValidator.java | 2 +-
.../db/mpp/plan/execution/QueryExecution.java | 2 +-
.../iotdb/db/protocol/mqtt/MPPPublishHandler.java | 4 +-
.../protocol/rest/impl/GrafanaApiServiceImpl.java | 4 +-
.../db/protocol/rest/impl/RestApiServiceImpl.java | 4 +-
.../db/service/metrics/IoTDBInternalReporter.java | 4 +-
.../service/thrift/impl/ClientRPCServiceImpl.java | 4 +-
.../impl/DataNodeInternalRPCServiceImpl.java | 4 +-
.../java/org/apache/iotdb/db/sync/SyncService.java | 2 +-
.../iotdb/db/sync/pipedata/load/ILoader.java | 4 +-
.../db/sync/transport/server/ReceiverManager.java | 2 +-
.../db/wal/recover/file/TsFilePlanRedoer.java | 2 +-
.../db/mpp/plan/analyze/FakeSchemaFetcherImpl.java | 1 +
.../iotdb/db/mpp/plan/plan/distribution/Util.java | 2 +-
25 files changed, 821 insertions(+), 628 deletions(-)
diff --git
a/server/src/main/java/org/apache/iotdb/db/client/DataNodeInternalClient.java
b/server/src/main/java/org/apache/iotdb/db/client/DataNodeInternalClient.java
index 148cce158a..497fd2aa38 100644
---
a/server/src/main/java/org/apache/iotdb/db/client/DataNodeInternalClient.java
+++
b/server/src/main/java/org/apache/iotdb/db/client/DataNodeInternalClient.java
@@ -27,9 +27,9 @@ import org.apache.iotdb.db.exception.IntoProcessException;
import org.apache.iotdb.db.mpp.common.SessionInfo;
import org.apache.iotdb.db.mpp.plan.Coordinator;
import org.apache.iotdb.db.mpp.plan.analyze.ClusterPartitionFetcher;
-import org.apache.iotdb.db.mpp.plan.analyze.ClusterSchemaFetcher;
import org.apache.iotdb.db.mpp.plan.analyze.IPartitionFetcher;
-import org.apache.iotdb.db.mpp.plan.analyze.ISchemaFetcher;
+import org.apache.iotdb.db.mpp.plan.analyze.schema.ClusterSchemaFetcher;
+import org.apache.iotdb.db.mpp.plan.analyze.schema.ISchemaFetcher;
import org.apache.iotdb.db.mpp.plan.execution.ExecutionResult;
import org.apache.iotdb.db.mpp.plan.statement.crud.InsertMultiTabletsStatement;
import org.apache.iotdb.db.query.control.SessionManager;
diff --git
a/server/src/main/java/org/apache/iotdb/db/metadata/cache/DataNodeSchemaCache.java
b/server/src/main/java/org/apache/iotdb/db/metadata/cache/DataNodeSchemaCache.java
index c8a0f68b0b..01f6305806 100644
---
a/server/src/main/java/org/apache/iotdb/db/metadata/cache/DataNodeSchemaCache.java
+++
b/server/src/main/java/org/apache/iotdb/db/metadata/cache/DataNodeSchemaCache.java
@@ -142,10 +142,6 @@ public class DataNodeSchemaCache {
}
}
- public void put(String storageGroup, MeasurementPath measurementPath) {
- putSingleMeasurementPath(storageGroup, measurementPath);
- }
-
private void putSingleMeasurementPath(String storageGroup, MeasurementPath
measurementPath) {
SchemaCacheEntry schemaCacheEntry =
new SchemaCacheEntry(
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/executor/RegionWriteExecutor.java
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/executor/RegionWriteExecutor.java
index c1f46626e7..73df88fd94 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/executor/RegionWriteExecutor.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/executor/RegionWriteExecutor.java
@@ -39,7 +39,7 @@ import
org.apache.iotdb.db.metadata.schemaregion.ISchemaRegion;
import org.apache.iotdb.db.metadata.schemaregion.SchemaEngine;
import org.apache.iotdb.db.metadata.template.ClusterTemplateManager;
import org.apache.iotdb.db.metadata.template.Template;
-import org.apache.iotdb.db.mpp.plan.analyze.SchemaValidator;
+import org.apache.iotdb.db.mpp.plan.analyze.schema.SchemaValidator;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanVisitor;
import
org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.write.ActivateTemplateNode;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/Coordinator.java
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/Coordinator.java
index 3a3002850e..4940a4f538 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/Coordinator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/Coordinator.java
@@ -31,7 +31,7 @@ import org.apache.iotdb.db.mpp.common.QueryId;
import org.apache.iotdb.db.mpp.common.SessionInfo;
import org.apache.iotdb.db.mpp.execution.QueryIdGenerator;
import org.apache.iotdb.db.mpp.plan.analyze.IPartitionFetcher;
-import org.apache.iotdb.db.mpp.plan.analyze.ISchemaFetcher;
+import org.apache.iotdb.db.mpp.plan.analyze.schema.ISchemaFetcher;
import org.apache.iotdb.db.mpp.plan.constant.DataNodeEndPoints;
import org.apache.iotdb.db.mpp.plan.execution.ExecutionResult;
import org.apache.iotdb.db.mpp.plan.execution.IQueryExecution;
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java
index 4d5d858b48..13213da4fa 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java
@@ -53,6 +53,8 @@ import
org.apache.iotdb.db.mpp.common.header.DatasetHeaderFactory;
import org.apache.iotdb.db.mpp.common.schematree.DeviceSchemaInfo;
import org.apache.iotdb.db.mpp.common.schematree.ISchemaTree;
import org.apache.iotdb.db.mpp.plan.Coordinator;
+import org.apache.iotdb.db.mpp.plan.analyze.schema.ISchemaFetcher;
+import org.apache.iotdb.db.mpp.plan.analyze.schema.SchemaValidator;
import org.apache.iotdb.db.mpp.plan.execution.ExecutionResult;
import org.apache.iotdb.db.mpp.plan.expression.Expression;
import org.apache.iotdb.db.mpp.plan.expression.ExpressionType;
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/Analyzer.java
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/Analyzer.java
index 83f0378323..851069dc3f 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/Analyzer.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/Analyzer.java
@@ -20,6 +20,8 @@
package org.apache.iotdb.db.mpp.plan.analyze;
import org.apache.iotdb.db.mpp.common.MPPQueryContext;
+import org.apache.iotdb.db.mpp.plan.analyze.schema.ClusterSchemaFetcher;
+import org.apache.iotdb.db.mpp.plan.analyze.schema.ISchemaFetcher;
import org.apache.iotdb.db.mpp.plan.statement.Statement;
import static org.apache.iotdb.db.mpp.common.QueryId.mockQueryId;
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/ClusterSchemaFetcher.java
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/ClusterSchemaFetcher.java
deleted file mode 100644
index 16a49ae47a..0000000000
---
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/ClusterSchemaFetcher.java
+++ /dev/null
@@ -1,599 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.iotdb.db.mpp.plan.analyze;
-
-import org.apache.iotdb.common.rpc.thrift.TSStatus;
-import org.apache.iotdb.commons.exception.IoTDBException;
-import org.apache.iotdb.commons.exception.MetadataException;
-import org.apache.iotdb.commons.path.MeasurementPath;
-import org.apache.iotdb.commons.path.PartialPath;
-import org.apache.iotdb.commons.path.PathPatternTree;
-import org.apache.iotdb.db.conf.IoTDBConfig;
-import org.apache.iotdb.db.conf.IoTDBDescriptor;
-import org.apache.iotdb.db.exception.sql.SemanticException;
-import org.apache.iotdb.db.metadata.cache.DataNodeSchemaCache;
-import org.apache.iotdb.db.metadata.template.ClusterTemplateManager;
-import org.apache.iotdb.db.metadata.template.ITemplateManager;
-import org.apache.iotdb.db.metadata.template.Template;
-import org.apache.iotdb.db.mpp.common.schematree.ClusterSchemaTree;
-import org.apache.iotdb.db.mpp.common.schematree.DeviceSchemaInfo;
-import org.apache.iotdb.db.mpp.common.schematree.ISchemaTree;
-import org.apache.iotdb.db.mpp.plan.Coordinator;
-import org.apache.iotdb.db.mpp.plan.execution.ExecutionResult;
-import org.apache.iotdb.db.mpp.plan.statement.Statement;
-import
org.apache.iotdb.db.mpp.plan.statement.internal.InternalCreateTimeSeriesStatement;
-import org.apache.iotdb.db.mpp.plan.statement.internal.SchemaFetchStatement;
-import
org.apache.iotdb.db.mpp.plan.statement.metadata.template.ActivateTemplateStatement;
-import org.apache.iotdb.db.query.control.SessionManager;
-import org.apache.iotdb.db.utils.SetThreadName;
-import org.apache.iotdb.rpc.TSStatusCode;
-import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
-import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
-import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
-import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
-import org.apache.iotdb.tsfile.read.common.block.TsBlock;
-import org.apache.iotdb.tsfile.read.common.block.column.Column;
-import org.apache.iotdb.tsfile.utils.Binary;
-import org.apache.iotdb.tsfile.utils.Pair;
-import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
-import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
-import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
-
-import java.io.ByteArrayInputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
-import java.util.Set;
-import java.util.function.Function;
-import java.util.stream.Collectors;
-import java.util.stream.IntStream;
-
-import static
org.apache.iotdb.db.utils.EncodingInferenceUtils.getDefaultEncoding;
-
-public class ClusterSchemaFetcher implements ISchemaFetcher {
-
- private final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
-
- private final Coordinator coordinator = Coordinator.getInstance();
- private final DataNodeSchemaCache schemaCache =
DataNodeSchemaCache.getInstance();
- private final ITemplateManager templateManager =
ClusterTemplateManager.getInstance();
-
- private static final class ClusterSchemaFetcherHolder {
- private static final ClusterSchemaFetcher INSTANCE = new
ClusterSchemaFetcher();
-
- private ClusterSchemaFetcherHolder() {}
- }
-
- public static ClusterSchemaFetcher getInstance() {
- return ClusterSchemaFetcherHolder.INSTANCE;
- }
-
- private ClusterSchemaFetcher() {}
-
- @Override
- public ClusterSchemaTree fetchSchema(PathPatternTree patternTree) {
- return fetchSchema(patternTree, false);
- }
-
- @Override
- public ClusterSchemaTree fetchSchemaWithTags(PathPatternTree patternTree) {
- return fetchSchema(patternTree, true);
- }
-
- private ClusterSchemaTree fetchSchema(PathPatternTree patternTree, boolean
withTags) {
- Map<Integer, Template> templateMap = new HashMap<>();
- patternTree.constructTree();
- List<PartialPath> pathPatternList = patternTree.getAllPathPatterns();
- for (PartialPath pattern : pathPatternList) {
- templateMap.putAll(templateManager.checkAllRelatedTemplate(pattern));
- }
-
- if (withTags) {
- return executeSchemaFetchQuery(new SchemaFetchStatement(patternTree,
templateMap, withTags));
- }
-
- List<PartialPath> fullPathList = new ArrayList<>();
- for (PartialPath pattern : pathPatternList) {
- if (!pattern.hasWildcard()) {
- fullPathList.add(pattern);
- }
- }
-
- if (fullPathList.isEmpty()) {
- return executeSchemaFetchQuery(new SchemaFetchStatement(patternTree,
templateMap, withTags));
- }
-
- // The schema cache R/W and fetch operation must be locked together thus
the cache clean
- // operation executed by delete timeseries will be effective.
- schemaCache.takeReadLock();
- try {
- ClusterSchemaTree schemaTree;
- if (fullPathList.size() == pathPatternList.size()) {
- boolean isAllCached = true;
- schemaTree = new ClusterSchemaTree();
- ClusterSchemaTree cachedSchema;
- Set<String> storageGroupSet = new HashSet<>();
- for (PartialPath fullPath : fullPathList) {
- cachedSchema = schemaCache.get(fullPath);
- if (cachedSchema.isEmpty()) {
- isAllCached = false;
- break;
- } else {
- schemaTree.mergeSchemaTree(cachedSchema);
- storageGroupSet.addAll(cachedSchema.getDatabases());
- }
- }
- if (isAllCached) {
- schemaTree.setDatabases(storageGroupSet);
- return schemaTree;
- }
- }
-
- schemaTree =
- executeSchemaFetchQuery(new SchemaFetchStatement(patternTree,
templateMap, withTags));
-
- // only cache the schema fetched by full path
- List<MeasurementPath> measurementPathList;
- for (PartialPath fullPath : fullPathList) {
- measurementPathList = schemaTree.searchMeasurementPaths(fullPath).left;
- if (measurementPathList.isEmpty()) {
- continue;
- }
- schemaCache.put(
- schemaTree.getBelongedDatabase(measurementPathList.get(0)),
measurementPathList.get(0));
- }
- return schemaTree;
- } finally {
- schemaCache.releaseReadLock();
- }
- }
-
- private ClusterSchemaTree executeSchemaFetchQuery(SchemaFetchStatement
schemaFetchStatement) {
- long queryId = SessionManager.getInstance().requestQueryId();
- try {
- ExecutionResult executionResult =
- coordinator.execute(
- schemaFetchStatement,
- queryId,
- null,
- "",
- ClusterPartitionFetcher.getInstance(),
- this,
- config.getQueryTimeoutThreshold());
- if (executionResult.status.getCode() !=
TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
- throw new RuntimeException(
- String.format(
- "cannot fetch schema, status is: %s, msg is: %s",
- executionResult.status.getCode(),
executionResult.status.getMessage()));
- }
- try (SetThreadName threadName = new
SetThreadName(executionResult.queryId.getId())) {
- ClusterSchemaTree result = new ClusterSchemaTree();
- Set<String> databaseSet = new HashSet<>();
- while (coordinator.getQueryExecution(queryId).hasNextResult()) {
- // The query will be transited to FINISHED when invoking
getBatchResult() at the last time
- // So we don't need to clean up it manually
- Optional<TsBlock> tsBlock;
- try {
- tsBlock = coordinator.getQueryExecution(queryId).getBatchResult();
- } catch (IoTDBException e) {
- throw new RuntimeException("Fetch Schema failed. ", e);
- }
- if (!tsBlock.isPresent() || tsBlock.get().isEmpty()) {
- break;
- }
- Column column = tsBlock.get().getColumn(0);
- for (int i = 0; i < column.getPositionCount(); i++) {
- parseFetchedData(column.getBinary(i), result, databaseSet);
- }
- }
- result.setDatabases(databaseSet);
- return result;
- }
- } finally {
- coordinator.cleanupQueryExecution(queryId);
- }
- }
-
- private void parseFetchedData(
- Binary data, ClusterSchemaTree resultSchemaTree, Set<String>
databaseSet) {
- InputStream inputStream = new ByteArrayInputStream(data.getValues());
- try {
- byte type = ReadWriteIOUtils.readByte(inputStream);
- if (type == 0) {
- int size = ReadWriteIOUtils.readInt(inputStream);
- for (int i = 0; i < size; i++) {
- databaseSet.add(ReadWriteIOUtils.readString(inputStream));
- }
- } else if (type == 1) {
-
resultSchemaTree.mergeSchemaTree(ClusterSchemaTree.deserialize(inputStream));
- } else {
- throw new RuntimeException(
- new MetadataException("Failed to fetch schema because of
unrecognized data"));
- }
- } catch (IOException e) {
- // Totally memory operation. This case won't happen.
- }
- }
-
- @Override
- public ISchemaTree fetchSchemaWithAutoCreate(
- PartialPath devicePath,
- String[] measurements,
- Function<Integer, TSDataType> getDataType,
- boolean isAligned) {
- // The schema cache R/W and fetch operation must be locked together thus
the cache clean
- // operation executed by delete timeseries will be effective.
- schemaCache.takeReadLock();
- try {
- ClusterSchemaTree schemaTree = schemaCache.get(devicePath, measurements);
- List<Integer> indexOfMissingMeasurements =
- checkMissingMeasurements(schemaTree, devicePath, measurements);
-
- // all schema can be taken from cache
- if (indexOfMissingMeasurements.isEmpty()) {
- return schemaTree;
- }
-
- // try fetch the missing schema from remote and cache fetched schema
- PathPatternTree patternTree = new PathPatternTree();
- for (int index : indexOfMissingMeasurements) {
- patternTree.appendFullPath(devicePath, measurements[index]);
- }
- ClusterSchemaTree remoteSchemaTree = fetchSchema(patternTree);
- if (!remoteSchemaTree.isEmpty()) {
- schemaTree.mergeSchemaTree(remoteSchemaTree);
- schemaCache.put(remoteSchemaTree);
- }
-
- if (!config.isAutoCreateSchemaEnabled()) {
- return schemaTree;
- }
-
- // auto create the still missing schema and merge them into schemaTree
- checkAndAutoCreateMissingMeasurements(
- schemaTree,
- devicePath,
- indexOfMissingMeasurements,
- measurements,
- getDataType,
- null,
- null,
- isAligned);
-
- return schemaTree;
- } finally {
- schemaCache.releaseReadLock();
- }
- }
-
- @Override
- public ISchemaTree fetchSchemaListWithAutoCreate(
- List<PartialPath> devicePathList,
- List<String[]> measurementsList,
- List<TSDataType[]> tsDataTypesList,
- List<Boolean> isAlignedList) {
- return fetchSchemaListWithAutoCreate(
- devicePathList, measurementsList, tsDataTypesList, null, null,
isAlignedList);
- }
-
- @Override
- public ISchemaTree fetchSchemaListWithAutoCreate(
- List<PartialPath> devicePathList,
- List<String[]> measurementsList,
- List<TSDataType[]> tsDataTypesList,
- List<TSEncoding[]> encodingsList,
- List<CompressionType[]> compressionTypesList,
- List<Boolean> isAlignedList) {
- // The schema cache R/W and fetch operation must be locked together thus
the cache clean
- // operation executed by delete timeseries will be effective.
- schemaCache.takeReadLock();
- try {
- ClusterSchemaTree schemaTree = new ClusterSchemaTree();
- PathPatternTree patternTree = new PathPatternTree();
- List<List<Integer>> indexOfMissingMeasurementsList = new
ArrayList<>(devicePathList.size());
- for (int i = 0; i < devicePathList.size(); i++) {
- schemaTree.mergeSchemaTree(schemaCache.get(devicePathList.get(i),
measurementsList.get(i)));
- List<Integer> indexOfMissingMeasurements =
- checkMissingMeasurements(schemaTree, devicePathList.get(i),
measurementsList.get(i));
- indexOfMissingMeasurementsList.add(indexOfMissingMeasurements);
- for (int index : indexOfMissingMeasurements) {
- patternTree.appendFullPath(devicePathList.get(i),
measurementsList.get(i)[index]);
- }
- }
-
- // all schema can be taken from cache
- if (patternTree.isEmpty()) {
- return schemaTree;
- }
-
- // try fetch the missing schema from remote and cache fetched schema
- ClusterSchemaTree remoteSchemaTree = fetchSchema(patternTree);
- if (!remoteSchemaTree.isEmpty()) {
- schemaTree.mergeSchemaTree(remoteSchemaTree);
- schemaCache.put(remoteSchemaTree);
- }
-
- if (!config.isAutoCreateSchemaEnabled()) {
- return schemaTree;
- }
-
- // auto create the still missing schema and merge them into schemaTree
- for (int i = 0; i < devicePathList.size(); i++) {
- int finalI = i;
- checkAndAutoCreateMissingMeasurements(
- schemaTree,
- devicePathList.get(i),
- indexOfMissingMeasurementsList.get(i),
- measurementsList.get(i),
- index -> tsDataTypesList.get(finalI)[index],
- encodingsList == null ? null : encodingsList.get(i),
- compressionTypesList == null ? null : compressionTypesList.get(i),
- isAlignedList.get(i));
- }
- return schemaTree;
- } finally {
- schemaCache.releaseReadLock();
- }
- }
-
- @Override
- public Pair<Template, PartialPath> checkTemplateSetInfo(PartialPath path) {
- return templateManager.checkTemplateSetInfo(path);
- }
-
- @Override
- public Map<Integer, Template> checkAllRelatedTemplate(PartialPath
pathPattern) {
- return templateManager.checkAllRelatedTemplate(pathPattern);
- }
-
- @Override
- public Pair<Template, List<PartialPath>> getAllPathsSetTemplate(String
templateName) {
- return templateManager.getAllPathsSetTemplate(templateName);
- }
-
- // check which measurements are missing and auto create the missing
measurements and merge them
- // into given schemaTree
- private void checkAndAutoCreateMissingMeasurements(
- ClusterSchemaTree schemaTree,
- PartialPath devicePath,
- List<Integer> indexOfMissingMeasurements,
- String[] measurements,
- Function<Integer, TSDataType> getDataType,
- TSEncoding[] encodings,
- CompressionType[] compressionTypes,
- boolean isAligned) {
- // check missing measurements
- DeviceSchemaInfo deviceSchemaInfo =
- schemaTree.searchDeviceSchemaInfo(
- devicePath,
- indexOfMissingMeasurements.stream()
- .map(index -> measurements[index])
- .collect(Collectors.toList()));
- if (deviceSchemaInfo != null) {
- List<MeasurementSchema> schemaList =
deviceSchemaInfo.getMeasurementSchemaList();
- int removedCount = 0;
- for (int i = 0, size = schemaList.size(); i < size; i++) {
- if (schemaList.get(i) != null) {
- indexOfMissingMeasurements.remove(i - removedCount);
- removedCount++;
- }
- }
- }
- if (indexOfMissingMeasurements.isEmpty()) {
- return;
- }
-
- // check whether there is template should be activated
- Pair<Template, PartialPath> templateInfo =
templateManager.checkTemplateSetInfo(devicePath);
- if (templateInfo != null) {
- Template template = templateInfo.left;
- boolean shouldActivateTemplate = false;
- for (int index : indexOfMissingMeasurements) {
- if (template.hasSchema(measurements[index])) {
- shouldActivateTemplate = true;
- break;
- }
- }
-
- if (shouldActivateTemplate) {
- internalActivateTemplate(devicePath);
- List<Integer> recheckedIndexOfMissingMeasurements = new ArrayList<>();
- for (int i = 0; i < indexOfMissingMeasurements.size(); i++) {
- if (!template.hasSchema(measurements[i])) {
-
recheckedIndexOfMissingMeasurements.add(indexOfMissingMeasurements.get(i));
- }
- }
- indexOfMissingMeasurements = recheckedIndexOfMissingMeasurements;
- for (Map.Entry<String, IMeasurementSchema> entry :
template.getSchemaMap().entrySet()) {
- schemaTree.appendSingleMeasurement(
- devicePath.concatNode(entry.getKey()),
- (MeasurementSchema) entry.getValue(),
- null,
- null,
- template.isDirectAligned());
- }
-
- if (indexOfMissingMeasurements.isEmpty()) {
- return;
- }
- }
- }
-
- // auto create the rest missing timeseries
- List<String> missingMeasurements = new
ArrayList<>(indexOfMissingMeasurements.size());
- List<TSDataType> dataTypesOfMissingMeasurement =
- new ArrayList<>(indexOfMissingMeasurements.size());
- List<TSEncoding> encodingsOfMissingMeasurement =
- new ArrayList<>(indexOfMissingMeasurements.size());
- List<CompressionType> compressionTypesOfMissingMeasurement =
- new ArrayList<>(indexOfMissingMeasurements.size());
- indexOfMissingMeasurements.forEach(
- index -> {
- TSDataType tsDataType = getDataType.apply(index);
- // tsDataType == null means insert null value to a non-exist series
- // should skip creating them
- if (tsDataType != null) {
- missingMeasurements.add(measurements[index]);
- dataTypesOfMissingMeasurement.add(tsDataType);
- encodingsOfMissingMeasurement.add(
- encodings == null ? getDefaultEncoding(tsDataType) :
encodings[index]);
- compressionTypesOfMissingMeasurement.add(
- compressionTypes == null
- ?
TSFileDescriptor.getInstance().getConfig().getCompressor()
- : compressionTypes[index]);
- }
- });
-
- if (!missingMeasurements.isEmpty()) {
- schemaTree.mergeSchemaTree(
- internalCreateTimeseries(
- devicePath,
- missingMeasurements,
- dataTypesOfMissingMeasurement,
- encodingsOfMissingMeasurement,
- compressionTypesOfMissingMeasurement,
- isAligned));
- }
- }
-
- private List<Integer> checkMissingMeasurements(
- ISchemaTree schemaTree, PartialPath devicePath, String[] measurements) {
- DeviceSchemaInfo deviceSchemaInfo =
- schemaTree.searchDeviceSchemaInfo(devicePath,
Arrays.asList(measurements));
- if (deviceSchemaInfo == null) {
- return IntStream.range(0,
measurements.length).boxed().collect(Collectors.toList());
- }
-
- List<Integer> indexOfMissingMeasurements = new ArrayList<>();
- List<MeasurementSchema> schemaList =
deviceSchemaInfo.getMeasurementSchemaList();
- for (int i = 0; i < measurements.length; i++) {
- if (schemaList.get(i) == null) {
- indexOfMissingMeasurements.add(i);
- }
- }
-
- return indexOfMissingMeasurements;
- }
-
- // try to create the target timeseries and return schemaTree involving
successfully created
- // timeseries and existing timeseries
- private ClusterSchemaTree internalCreateTimeseries(
- PartialPath devicePath,
- List<String> measurements,
- List<TSDataType> tsDataTypes,
- List<TSEncoding> encodings,
- List<CompressionType> compressors,
- boolean isAligned) {
- List<MeasurementPath> measurementPathList =
- executeInternalCreateTimeseriesStatement(
- new InternalCreateTimeSeriesStatement(
- devicePath, measurements, tsDataTypes, encodings, compressors,
isAligned));
-
- Set<Integer> alreadyExistingMeasurementIndexSet =
- measurementPathList.stream()
- .map(o -> measurements.indexOf(o.getMeasurement()))
- .collect(Collectors.toSet());
-
- ClusterSchemaTree schemaTree = new ClusterSchemaTree();
- schemaTree.appendMeasurementPaths(measurementPathList);
-
- for (int i = 0, size = measurements.size(); i < size; i++) {
- if (alreadyExistingMeasurementIndexSet.contains(i)) {
- continue;
- }
-
- schemaTree.appendSingleMeasurement(
- devicePath.concatNode(measurements.get(i)),
- new MeasurementSchema(
- measurements.get(i), tsDataTypes.get(i), encodings.get(i),
compressors.get(i)),
- null,
- null,
- isAligned);
- }
-
- return schemaTree;
- }
-
- // auto create timeseries and return the existing timeseries info
- private List<MeasurementPath> executeInternalCreateTimeseriesStatement(
- InternalCreateTimeSeriesStatement statement) {
-
- ExecutionResult executionResult = executeStatement(statement);
-
- int statusCode = executionResult.status.getCode();
- if (statusCode == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
- return Collections.emptyList();
- }
-
- if (statusCode != TSStatusCode.MULTIPLE_ERROR.getStatusCode()) {
- throw new RuntimeException(
- new IoTDBException(executionResult.status.getMessage(), statusCode));
- }
-
- Set<String> failedCreationSet = new HashSet<>();
- List<MeasurementPath> alreadyExistingMeasurements = new ArrayList<>();
- for (TSStatus subStatus : executionResult.status.subStatus) {
- if (subStatus.code ==
TSStatusCode.TIMESERIES_ALREADY_EXIST.getStatusCode()) {
- alreadyExistingMeasurements.add(
- MeasurementPath.parseDataFromString(subStatus.getMessage()));
- } else {
- failedCreationSet.add(subStatus.message);
- }
- }
-
- if (!failedCreationSet.isEmpty()) {
- throw new SemanticException(new MetadataException(String.join("; ",
failedCreationSet)));
- }
-
- return alreadyExistingMeasurements;
- }
-
- public void internalActivateTemplate(PartialPath devicePath) {
- ExecutionResult executionResult = executeStatement(new
ActivateTemplateStatement(devicePath));
- TSStatus status = executionResult.status;
- if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()
- && status.getCode() !=
TSStatusCode.TEMPLATE_IS_IN_USE.getStatusCode()) {
- throw new RuntimeException(new IoTDBException(status.getMessage(),
status.getCode()));
- }
- }
-
- private ExecutionResult executeStatement(Statement statement) {
- long queryId = SessionManager.getInstance().requestQueryId();
- return coordinator.execute(
- statement,
- queryId,
- null,
- "",
- ClusterPartitionFetcher.getInstance(),
- this,
- config.getQueryTimeoutThreshold());
- }
-
- @Override
- public void invalidAllCache() {
- DataNodeSchemaCache.getInstance().cleanUp();
- }
-}
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/AutoCreateSchemaExecutor.java
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/AutoCreateSchemaExecutor.java
new file mode 100644
index 0000000000..638e0206e0
--- /dev/null
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/AutoCreateSchemaExecutor.java
@@ -0,0 +1,230 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.plan.analyze.schema;
+
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.exception.IoTDBException;
+import org.apache.iotdb.commons.exception.MetadataException;
+import org.apache.iotdb.commons.path.MeasurementPath;
+import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.db.exception.sql.SemanticException;
+import org.apache.iotdb.db.metadata.template.ITemplateManager;
+import org.apache.iotdb.db.metadata.template.Template;
+import org.apache.iotdb.db.mpp.common.schematree.ClusterSchemaTree;
+import org.apache.iotdb.db.mpp.plan.execution.ExecutionResult;
+import org.apache.iotdb.db.mpp.plan.statement.Statement;
+import
org.apache.iotdb.db.mpp.plan.statement.internal.InternalCreateTimeSeriesStatement;
+import
org.apache.iotdb.db.mpp.plan.statement.metadata.template.ActivateTemplateStatement;
+import org.apache.iotdb.rpc.TSStatusCode;
+import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
+import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
+import org.apache.iotdb.tsfile.utils.Pair;
+import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
+import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import static
org.apache.iotdb.db.utils.EncodingInferenceUtils.getDefaultEncoding;
+
+class AutoCreateSchemaExecutor {
+
+ private final ITemplateManager templateManager;
+ private final Function<Statement, ExecutionResult> statementExecutor;
+
+ AutoCreateSchemaExecutor(
+ ITemplateManager templateManager, Function<Statement, ExecutionResult>
statementExecutor) {
+ this.templateManager = templateManager;
+ this.statementExecutor = statementExecutor;
+ }
+
+ void autoCreateMissingMeasurements(
+ ClusterSchemaTree schemaTree,
+ PartialPath devicePath,
+ List<Integer> indexOfTargetMeasurements,
+ String[] measurements,
+ Function<Integer, TSDataType> getDataType,
+ TSEncoding[] encodings,
+ CompressionType[] compressionTypes,
+ boolean isAligned) {
+ // check whether there is template should be activated
+ Pair<Template, PartialPath> templateInfo =
templateManager.checkTemplateSetInfo(devicePath);
+ if (templateInfo != null) {
+ Template template = templateInfo.left;
+ boolean shouldActivateTemplate = false;
+ for (int index : indexOfTargetMeasurements) {
+ if (template.hasSchema(measurements[index])) {
+ shouldActivateTemplate = true;
+ break;
+ }
+ }
+
+ if (shouldActivateTemplate) {
+ internalActivateTemplate(devicePath);
+ List<Integer> recheckedIndexOfMissingMeasurements = new ArrayList<>();
+ for (int i = 0; i < indexOfTargetMeasurements.size(); i++) {
+ if (!template.hasSchema(measurements[i])) {
+
recheckedIndexOfMissingMeasurements.add(indexOfTargetMeasurements.get(i));
+ }
+ }
+ indexOfTargetMeasurements = recheckedIndexOfMissingMeasurements;
+ for (Map.Entry<String, IMeasurementSchema> entry :
template.getSchemaMap().entrySet()) {
+ schemaTree.appendSingleMeasurement(
+ devicePath.concatNode(entry.getKey()),
+ (MeasurementSchema) entry.getValue(),
+ null,
+ null,
+ template.isDirectAligned());
+ }
+
+ if (indexOfTargetMeasurements.isEmpty()) {
+ return;
+ }
+ }
+ }
+
+ // auto create the rest missing timeseries
+ List<String> missingMeasurements = new
ArrayList<>(indexOfTargetMeasurements.size());
+ List<TSDataType> dataTypesOfMissingMeasurement =
+ new ArrayList<>(indexOfTargetMeasurements.size());
+ List<TSEncoding> encodingsOfMissingMeasurement =
+ new ArrayList<>(indexOfTargetMeasurements.size());
+ List<CompressionType> compressionTypesOfMissingMeasurement =
+ new ArrayList<>(indexOfTargetMeasurements.size());
+ indexOfTargetMeasurements.forEach(
+ index -> {
+ TSDataType tsDataType = getDataType.apply(index);
+ // tsDataType == null means insert null value to a non-exist series
+ // should skip creating them
+ if (tsDataType != null) {
+ missingMeasurements.add(measurements[index]);
+ dataTypesOfMissingMeasurement.add(tsDataType);
+ encodingsOfMissingMeasurement.add(
+ encodings == null ? getDefaultEncoding(tsDataType) :
encodings[index]);
+ compressionTypesOfMissingMeasurement.add(
+ compressionTypes == null
+ ?
TSFileDescriptor.getInstance().getConfig().getCompressor()
+ : compressionTypes[index]);
+ }
+ });
+
+ if (!missingMeasurements.isEmpty()) {
+ schemaTree.mergeSchemaTree(
+ internalCreateTimeseries(
+ devicePath,
+ missingMeasurements,
+ dataTypesOfMissingMeasurement,
+ encodingsOfMissingMeasurement,
+ compressionTypesOfMissingMeasurement,
+ isAligned));
+ }
+ }
+
+ // try to create the target timeseries and return schemaTree involving
successfully created
+ // timeseries and existing timeseries
+ private ClusterSchemaTree internalCreateTimeseries(
+ PartialPath devicePath,
+ List<String> measurements,
+ List<TSDataType> tsDataTypes,
+ List<TSEncoding> encodings,
+ List<CompressionType> compressors,
+ boolean isAligned) {
+ List<MeasurementPath> measurementPathList =
+ executeInternalCreateTimeseriesStatement(
+ new InternalCreateTimeSeriesStatement(
+ devicePath, measurements, tsDataTypes, encodings, compressors,
isAligned));
+
+ Set<Integer> alreadyExistingMeasurementIndexSet =
+ measurementPathList.stream()
+ .map(o -> measurements.indexOf(o.getMeasurement()))
+ .collect(Collectors.toSet());
+
+ ClusterSchemaTree schemaTree = new ClusterSchemaTree();
+ schemaTree.appendMeasurementPaths(measurementPathList);
+
+ for (int i = 0, size = measurements.size(); i < size; i++) {
+ if (alreadyExistingMeasurementIndexSet.contains(i)) {
+ continue;
+ }
+
+ schemaTree.appendSingleMeasurement(
+ devicePath.concatNode(measurements.get(i)),
+ new MeasurementSchema(
+ measurements.get(i), tsDataTypes.get(i), encodings.get(i),
compressors.get(i)),
+ null,
+ null,
+ isAligned);
+ }
+
+ return schemaTree;
+ }
+
+ // auto create timeseries and return the existing timeseries info
+ private List<MeasurementPath> executeInternalCreateTimeseriesStatement(
+ InternalCreateTimeSeriesStatement statement) {
+
+ ExecutionResult executionResult = statementExecutor.apply(statement);
+
+ int statusCode = executionResult.status.getCode();
+ if (statusCode == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ return Collections.emptyList();
+ }
+
+ if (statusCode != TSStatusCode.MULTIPLE_ERROR.getStatusCode()) {
+ throw new RuntimeException(
+ new IoTDBException(executionResult.status.getMessage(), statusCode));
+ }
+
+ Set<String> failedCreationSet = new HashSet<>();
+ List<MeasurementPath> alreadyExistingMeasurements = new ArrayList<>();
+ for (TSStatus subStatus : executionResult.status.subStatus) {
+ if (subStatus.code ==
TSStatusCode.TIMESERIES_ALREADY_EXIST.getStatusCode()) {
+ alreadyExistingMeasurements.add(
+ MeasurementPath.parseDataFromString(subStatus.getMessage()));
+ } else {
+ failedCreationSet.add(subStatus.message);
+ }
+ }
+
+ if (!failedCreationSet.isEmpty()) {
+ throw new SemanticException(new MetadataException(String.join("; ",
failedCreationSet)));
+ }
+
+ return alreadyExistingMeasurements;
+ }
+
+ private void internalActivateTemplate(PartialPath devicePath) {
+ ExecutionResult executionResult =
+ statementExecutor.apply(new ActivateTemplateStatement(devicePath));
+ TSStatus status = executionResult.status;
+ if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()
+ && status.getCode() !=
TSStatusCode.TEMPLATE_IS_IN_USE.getStatusCode()) {
+ throw new RuntimeException(new IoTDBException(status.getMessage(),
status.getCode()));
+ }
+ }
+}
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/ClusterSchemaFetchExecutor.java
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/ClusterSchemaFetchExecutor.java
new file mode 100644
index 0000000000..3f8e609604
--- /dev/null
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/ClusterSchemaFetchExecutor.java
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.plan.analyze.schema;
+
+import org.apache.iotdb.commons.exception.IoTDBException;
+import org.apache.iotdb.commons.exception.MetadataException;
+import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.commons.path.PathPatternTree;
+import org.apache.iotdb.db.metadata.template.ITemplateManager;
+import org.apache.iotdb.db.metadata.template.Template;
+import org.apache.iotdb.db.mpp.common.schematree.ClusterSchemaTree;
+import org.apache.iotdb.db.mpp.plan.Coordinator;
+import org.apache.iotdb.db.mpp.plan.execution.ExecutionResult;
+import org.apache.iotdb.db.mpp.plan.statement.Statement;
+import org.apache.iotdb.db.mpp.plan.statement.internal.SchemaFetchStatement;
+import org.apache.iotdb.db.utils.SetThreadName;
+import org.apache.iotdb.rpc.TSStatusCode;
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+import org.apache.iotdb.tsfile.read.common.block.column.Column;
+import org.apache.iotdb.tsfile.utils.Binary;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.function.BiFunction;
+import java.util.function.Consumer;
+import java.util.function.Supplier;
+
+class ClusterSchemaFetchExecutor {
+
+ private final Coordinator coordinator;
+ private final ITemplateManager templateManager;
+ private final Supplier<Long> queryIdProvider;
+ private final BiFunction<Long, Statement, ExecutionResult> statementExecutor;
+ private final Consumer<ClusterSchemaTree> schemaCacheUpdater;
+
+ ClusterSchemaFetchExecutor(
+ Coordinator coordinator,
+ ITemplateManager templateManager,
+ Supplier<Long> queryIdProvider,
+ BiFunction<Long, Statement, ExecutionResult> statementExecutor,
+ Consumer<ClusterSchemaTree> schemaCacheUpdater) {
+ this.coordinator = coordinator;
+ this.templateManager = templateManager;
+ this.queryIdProvider = queryIdProvider;
+ this.statementExecutor = statementExecutor;
+ this.schemaCacheUpdater = schemaCacheUpdater;
+ }
+
+ /**
+ * This method is used for scenarios that patternTree may have wildcard or
there's no need to
+ * cache the result.
+ */
+ ClusterSchemaTree fetchSchemaOfFuzzyMatch(PathPatternTree patternTree,
boolean withTags) {
+ Map<Integer, Template> templateMap = new HashMap<>();
+ List<PartialPath> pathPatternList = patternTree.getAllPathPatterns();
+ for (PartialPath pattern : pathPatternList) {
+ templateMap.putAll(templateManager.checkAllRelatedTemplate(pattern));
+ }
+ return executeSchemaFetchQuery(new SchemaFetchStatement(patternTree,
templateMap, withTags));
+ }
+
+ /**
+ * This method is used for scenarios that patternTree has no wildcard and
the result should be
+ * cached.
+ *
+ * @param fullPathList all the fullPath without wildcard split from
rawPatternTree
+ * @param rawPatternTree the pattern tree consisting of the fullPathList
+ * @return fetched schema
+ */
+ ClusterSchemaTree fetchSchemaOfPreciseMatch(
+ List<PartialPath> fullPathList, PathPatternTree rawPatternTree) {
+ ClusterSchemaTree schemaTree =
+ executeSchemaFetchQuery(
+ new SchemaFetchStatement(rawPatternTree,
analyzeTemplate(fullPathList), false));
+ if (!schemaTree.isEmpty()) {
+ schemaCacheUpdater.accept(schemaTree);
+ }
+ return schemaTree;
+ }
+
+ ClusterSchemaTree fetchSchemaOfOneDevice(
+ PartialPath devicePath, String[] measurements, List<Integer>
indexOfTargetMeasurements) {
+ PathPatternTree patternTree = new PathPatternTree();
+ for (int index : indexOfTargetMeasurements) {
+ patternTree.appendFullPath(devicePath, measurements[index]);
+ }
+ patternTree.constructTree();
+ return fetchSchemaAndCacheResult(patternTree);
+ }
+
+ ClusterSchemaTree fetchSchemaOfMultiDevices(
+ List<PartialPath> devicePathList,
+ List<String[]> measurementsList,
+ List<List<Integer>> indexOfTargetMeasurementsList) {
+ PathPatternTree patternTree = new PathPatternTree();
+ for (int i = 0; i < devicePathList.size(); i++) {
+ for (int index : indexOfTargetMeasurementsList.get(i)) {
+ patternTree.appendFullPath(devicePathList.get(i),
measurementsList.get(i)[index]);
+ }
+ }
+ patternTree.constructTree();
+ return fetchSchemaAndCacheResult(patternTree);
+ }
+
+ private ClusterSchemaTree fetchSchemaAndCacheResult(PathPatternTree
patternTree) {
+ ClusterSchemaTree schemaTree =
+ executeSchemaFetchQuery(
+ new SchemaFetchStatement(
+ patternTree,
analyzeTemplate(patternTree.getAllPathPatterns()), false));
+ if (!schemaTree.isEmpty()) {
+ schemaCacheUpdater.accept(schemaTree);
+ }
+ return schemaTree;
+ }
+
+ private Map<Integer, Template> analyzeTemplate(List<PartialPath>
pathPatternList) {
+ Map<Integer, Template> templateMap = new HashMap<>();
+ for (PartialPath pattern : pathPatternList) {
+ templateMap.putAll(templateManager.checkAllRelatedTemplate(pattern));
+ }
+ return templateMap;
+ }
+
+ private ClusterSchemaTree executeSchemaFetchQuery(SchemaFetchStatement
schemaFetchStatement) {
+ long queryId = queryIdProvider.get();
+ try {
+ ExecutionResult executionResult = statementExecutor.apply(queryId,
schemaFetchStatement);
+ if (executionResult.status.getCode() !=
TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ throw new RuntimeException(
+ String.format(
+ "cannot fetch schema, status is: %s, msg is: %s",
+ executionResult.status.getCode(),
executionResult.status.getMessage()));
+ }
+ try (SetThreadName threadName = new
SetThreadName(executionResult.queryId.getId())) {
+ ClusterSchemaTree result = new ClusterSchemaTree();
+ Set<String> databaseSet = new HashSet<>();
+ while (coordinator.getQueryExecution(queryId).hasNextResult()) {
+ // The query will be transited to FINISHED when invoking
getBatchResult() at the last time
+ // So we don't need to clean up it manually
+ Optional<TsBlock> tsBlock;
+ try {
+ tsBlock = coordinator.getQueryExecution(queryId).getBatchResult();
+ } catch (IoTDBException e) {
+ throw new RuntimeException("Fetch Schema failed. ", e);
+ }
+ if (!tsBlock.isPresent() || tsBlock.get().isEmpty()) {
+ break;
+ }
+ Column column = tsBlock.get().getColumn(0);
+ for (int i = 0; i < column.getPositionCount(); i++) {
+ parseFetchedData(column.getBinary(i), result, databaseSet);
+ }
+ }
+ result.setDatabases(databaseSet);
+ return result;
+ }
+ } finally {
+ coordinator.cleanupQueryExecution(queryId);
+ }
+ }
+
+ private void parseFetchedData(
+ Binary data, ClusterSchemaTree resultSchemaTree, Set<String>
databaseSet) {
+ InputStream inputStream = new ByteArrayInputStream(data.getValues());
+ try {
+ byte type = ReadWriteIOUtils.readByte(inputStream);
+ if (type == 0) {
+ int size = ReadWriteIOUtils.readInt(inputStream);
+ for (int i = 0; i < size; i++) {
+ databaseSet.add(ReadWriteIOUtils.readString(inputStream));
+ }
+ } else if (type == 1) {
+
resultSchemaTree.mergeSchemaTree(ClusterSchemaTree.deserialize(inputStream));
+ } else {
+ throw new RuntimeException(
+ new MetadataException("Failed to fetch schema because of
unrecognized data"));
+ }
+ } catch (IOException e) {
+ // Totally memory operation. This case won't happen.
+ }
+ }
+}
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/ClusterSchemaFetcher.java
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/ClusterSchemaFetcher.java
new file mode 100644
index 0000000000..fb835ff76f
--- /dev/null
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/ClusterSchemaFetcher.java
@@ -0,0 +1,354 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.mpp.plan.analyze.schema;
+
+import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.commons.path.PathPatternTree;
+import org.apache.iotdb.db.conf.IoTDBConfig;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.metadata.cache.DataNodeSchemaCache;
+import org.apache.iotdb.db.metadata.template.ClusterTemplateManager;
+import org.apache.iotdb.db.metadata.template.ITemplateManager;
+import org.apache.iotdb.db.metadata.template.Template;
+import org.apache.iotdb.db.mpp.common.schematree.ClusterSchemaTree;
+import org.apache.iotdb.db.mpp.common.schematree.DeviceSchemaInfo;
+import org.apache.iotdb.db.mpp.common.schematree.ISchemaTree;
+import org.apache.iotdb.db.mpp.plan.Coordinator;
+import org.apache.iotdb.db.mpp.plan.analyze.ClusterPartitionFetcher;
+import org.apache.iotdb.db.query.control.SessionManager;
+import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
+import org.apache.iotdb.tsfile.utils.Pair;
+import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+public class ClusterSchemaFetcher implements ISchemaFetcher {
+
+ private final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
+
+ private final Coordinator coordinator = Coordinator.getInstance();
+ private final DataNodeSchemaCache schemaCache =
DataNodeSchemaCache.getInstance();
+ private final ITemplateManager templateManager =
ClusterTemplateManager.getInstance();
+
+ private final AutoCreateSchemaExecutor autoCreateSchemaExecutor =
+ new AutoCreateSchemaExecutor(
+ templateManager,
+ statement -> {
+ long queryId = SessionManager.getInstance().requestQueryId();
+ return coordinator.execute(
+ statement,
+ queryId,
+ null,
+ "",
+ ClusterPartitionFetcher.getInstance(),
+ this,
+ config.getQueryTimeoutThreshold());
+ });
+ private final ClusterSchemaFetchExecutor clusterSchemaFetchExecutor =
+ new ClusterSchemaFetchExecutor(
+ coordinator,
+ templateManager,
+ () -> SessionManager.getInstance().requestQueryId(),
+ (queryId, statement) ->
+ coordinator.execute(
+ statement,
+ queryId,
+ null,
+ "",
+ ClusterPartitionFetcher.getInstance(),
+ this,
+ config.getQueryTimeoutThreshold()),
+ schemaCache::put);
+
+ private static final class ClusterSchemaFetcherHolder {
+ private static final ClusterSchemaFetcher INSTANCE = new
ClusterSchemaFetcher();
+
+ private ClusterSchemaFetcherHolder() {}
+ }
+
+ public static ClusterSchemaFetcher getInstance() {
+ return ClusterSchemaFetcherHolder.INSTANCE;
+ }
+
+ private ClusterSchemaFetcher() {}
+
+ @Override
+ public ClusterSchemaTree fetchSchema(PathPatternTree patternTree) {
+ patternTree.constructTree();
+ List<PartialPath> pathPatternList = patternTree.getAllPathPatterns();
+ List<PartialPath> fullPathList = new ArrayList<>();
+ for (PartialPath pattern : pathPatternList) {
+ if (!pattern.hasWildcard()) {
+ fullPathList.add(pattern);
+ }
+ }
+
+ if (fullPathList.size() < pathPatternList.size()) {
+ return clusterSchemaFetchExecutor.fetchSchemaOfFuzzyMatch(patternTree,
false);
+ }
+
+ // The schema cache R/W and fetch operation must be locked together thus
the cache clean
+ // operation executed by delete timeseries will be effective.
+ schemaCache.takeReadLock();
+ try {
+ ClusterSchemaTree schemaTree;
+ if (fullPathList.size() == pathPatternList.size()) {
+ boolean isAllCached = true;
+ schemaTree = new ClusterSchemaTree();
+ ClusterSchemaTree cachedSchema;
+ Set<String> storageGroupSet = new HashSet<>();
+ for (PartialPath fullPath : fullPathList) {
+ cachedSchema = schemaCache.get(fullPath);
+ if (cachedSchema.isEmpty()) {
+ isAllCached = false;
+ break;
+ } else {
+ schemaTree.mergeSchemaTree(cachedSchema);
+ storageGroupSet.addAll(cachedSchema.getDatabases());
+ }
+ }
+ if (isAllCached) {
+ schemaTree.setDatabases(storageGroupSet);
+ return schemaTree;
+ }
+ }
+
+ return
clusterSchemaFetchExecutor.fetchSchemaOfPreciseMatch(fullPathList, patternTree);
+ } finally {
+ schemaCache.releaseReadLock();
+ }
+ }
+
+ @Override
+ public ClusterSchemaTree fetchSchemaWithTags(PathPatternTree patternTree) {
+ patternTree.constructTree();
+ return clusterSchemaFetchExecutor.fetchSchemaOfFuzzyMatch(patternTree,
true);
+ }
+
+ @Override
+ public ISchemaTree fetchSchemaWithAutoCreate(
+ PartialPath devicePath,
+ String[] measurements,
+ Function<Integer, TSDataType> getDataType,
+ boolean isAligned) {
+ // The schema cache R/W and fetch operation must be locked together thus
the cache clean
+ // operation executed by delete timeseries will be effective.
+ schemaCache.takeReadLock();
+ try {
+ ClusterSchemaTree schemaTree = schemaCache.get(devicePath, measurements);
+ List<Integer> indexOfMissingMeasurements =
+ checkMissingMeasurements(schemaTree, devicePath, measurements);
+
+ // all schema can be taken from cache
+ if (indexOfMissingMeasurements.isEmpty()) {
+ return schemaTree;
+ }
+
+ // try fetch the missing schema from remote and cache fetched schema
+ ClusterSchemaTree remoteSchemaTree =
+ clusterSchemaFetchExecutor.fetchSchemaOfOneDevice(
+ devicePath, measurements, indexOfMissingMeasurements);
+ if (!remoteSchemaTree.isEmpty()) {
+ schemaTree.mergeSchemaTree(remoteSchemaTree);
+ }
+
+ if (!config.isAutoCreateSchemaEnabled()) {
+ return schemaTree;
+ }
+
+ // auto create the still missing schema and merge them into schemaTree
+ checkAndAutoCreateMissingMeasurements(
+ schemaTree,
+ devicePath,
+ indexOfMissingMeasurements,
+ measurements,
+ getDataType,
+ null,
+ null,
+ isAligned);
+
+ return schemaTree;
+ } finally {
+ schemaCache.releaseReadLock();
+ }
+ }
+
+ @Override
+ public ISchemaTree fetchSchemaListWithAutoCreate(
+ List<PartialPath> devicePathList,
+ List<String[]> measurementsList,
+ List<TSDataType[]> tsDataTypesList,
+ List<Boolean> isAlignedList) {
+ return fetchSchemaListWithAutoCreate(
+ devicePathList, measurementsList, tsDataTypesList, null, null,
isAlignedList);
+ }
+
+ @Override
+ public ISchemaTree fetchSchemaListWithAutoCreate(
+ List<PartialPath> devicePathList,
+ List<String[]> measurementsList,
+ List<TSDataType[]> tsDataTypesList,
+ List<TSEncoding[]> encodingsList,
+ List<CompressionType[]> compressionTypesList,
+ List<Boolean> isAlignedList) {
+ // The schema cache R/W and fetch operation must be locked together thus
the cache clean
+ // operation executed by delete timeseries will be effective.
+ schemaCache.takeReadLock();
+ try {
+ ClusterSchemaTree schemaTree = new ClusterSchemaTree();
+ List<List<Integer>> indexOfMissingMeasurementsList = new
ArrayList<>(devicePathList.size());
+ boolean hasMissingMeasurement = false;
+ for (int i = 0; i < devicePathList.size(); i++) {
+ schemaTree.mergeSchemaTree(schemaCache.get(devicePathList.get(i),
measurementsList.get(i)));
+ List<Integer> indexOfMissingMeasurements =
+ checkMissingMeasurements(schemaTree, devicePathList.get(i),
measurementsList.get(i));
+ indexOfMissingMeasurementsList.add(indexOfMissingMeasurements);
+ if (!indexOfMissingMeasurements.isEmpty()) {
+ hasMissingMeasurement = true;
+ }
+ }
+
+ // all schema can be taken from cache
+ if (!hasMissingMeasurement) {
+ return schemaTree;
+ }
+
+ // try fetch the missing schema from remote and cache fetched schema
+ ClusterSchemaTree remoteSchemaTree =
+ clusterSchemaFetchExecutor.fetchSchemaOfMultiDevices(
+ devicePathList, measurementsList,
indexOfMissingMeasurementsList);
+ if (!remoteSchemaTree.isEmpty()) {
+ schemaTree.mergeSchemaTree(remoteSchemaTree);
+ }
+
+ if (!config.isAutoCreateSchemaEnabled()) {
+ return schemaTree;
+ }
+
+ // auto create the still missing schema and merge them into schemaTree
+ for (int i = 0; i < devicePathList.size(); i++) {
+ int finalI = i;
+ checkAndAutoCreateMissingMeasurements(
+ schemaTree,
+ devicePathList.get(i),
+ indexOfMissingMeasurementsList.get(i),
+ measurementsList.get(i),
+ index -> tsDataTypesList.get(finalI)[index],
+ encodingsList == null ? null : encodingsList.get(i),
+ compressionTypesList == null ? null : compressionTypesList.get(i),
+ isAlignedList.get(i));
+ }
+ return schemaTree;
+ } finally {
+ schemaCache.releaseReadLock();
+ }
+ }
+
+ @Override
+ public Pair<Template, PartialPath> checkTemplateSetInfo(PartialPath path) {
+ return templateManager.checkTemplateSetInfo(path);
+ }
+
+ @Override
+ public Map<Integer, Template> checkAllRelatedTemplate(PartialPath
pathPattern) {
+ return templateManager.checkAllRelatedTemplate(pathPattern);
+ }
+
+ @Override
+ public Pair<Template, List<PartialPath>> getAllPathsSetTemplate(String
templateName) {
+ return templateManager.getAllPathsSetTemplate(templateName);
+ }
+
+ // check which measurements are missing and auto create the missing
measurements and merge them
+ // into given schemaTree
+ private void checkAndAutoCreateMissingMeasurements(
+ ClusterSchemaTree schemaTree,
+ PartialPath devicePath,
+ List<Integer> indexOfMissingMeasurements,
+ String[] measurements,
+ Function<Integer, TSDataType> getDataType,
+ TSEncoding[] encodings,
+ CompressionType[] compressionTypes,
+ boolean isAligned) {
+ // check missing measurements
+ DeviceSchemaInfo deviceSchemaInfo =
+ schemaTree.searchDeviceSchemaInfo(
+ devicePath,
+ indexOfMissingMeasurements.stream()
+ .map(index -> measurements[index])
+ .collect(Collectors.toList()));
+ if (deviceSchemaInfo != null) {
+ List<MeasurementSchema> schemaList =
deviceSchemaInfo.getMeasurementSchemaList();
+ int removedCount = 0;
+ for (int i = 0, size = schemaList.size(); i < size; i++) {
+ if (schemaList.get(i) != null) {
+ indexOfMissingMeasurements.remove(i - removedCount);
+ removedCount++;
+ }
+ }
+ }
+ if (indexOfMissingMeasurements.isEmpty()) {
+ return;
+ }
+
+ autoCreateSchemaExecutor.autoCreateMissingMeasurements(
+ schemaTree,
+ devicePath,
+ indexOfMissingMeasurements,
+ measurements,
+ getDataType,
+ encodings,
+ compressionTypes,
+ isAligned);
+ }
+
+ private List<Integer> checkMissingMeasurements(
+ ISchemaTree schemaTree, PartialPath devicePath, String[] measurements) {
+ DeviceSchemaInfo deviceSchemaInfo =
+ schemaTree.searchDeviceSchemaInfo(devicePath,
Arrays.asList(measurements));
+ if (deviceSchemaInfo == null) {
+ return IntStream.range(0,
measurements.length).boxed().collect(Collectors.toList());
+ }
+
+ List<Integer> indexOfMissingMeasurements = new ArrayList<>();
+ List<MeasurementSchema> schemaList =
deviceSchemaInfo.getMeasurementSchemaList();
+ for (int i = 0; i < measurements.length; i++) {
+ if (schemaList.get(i) == null) {
+ indexOfMissingMeasurements.add(i);
+ }
+ }
+
+ return indexOfMissingMeasurements;
+ }
+
+ @Override
+ public void invalidAllCache() {
+ DataNodeSchemaCache.getInstance().cleanUp();
+ }
+}
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/ISchemaFetcher.java
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/ISchemaFetcher.java
similarity index 97%
rename from
server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/ISchemaFetcher.java
rename to
server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/ISchemaFetcher.java
index 5ec75fe1da..37516321c0 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/ISchemaFetcher.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/ISchemaFetcher.java
@@ -17,7 +17,7 @@
* under the License.
*/
-package org.apache.iotdb.db.mpp.plan.analyze;
+package org.apache.iotdb.db.mpp.plan.analyze.schema;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.commons.path.PathPatternTree;
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/SchemaValidator.java
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/SchemaValidator.java
similarity index 98%
rename from
server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/SchemaValidator.java
rename to
server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/SchemaValidator.java
index cb9b8b8899..3882d6e34d 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/SchemaValidator.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/SchemaValidator.java
@@ -17,7 +17,7 @@
* under the License.
*/
-package org.apache.iotdb.db.mpp.plan.analyze;
+package org.apache.iotdb.db.mpp.plan.analyze.schema;
import org.apache.iotdb.commons.exception.MetadataException;
import org.apache.iotdb.commons.path.PartialPath;
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java
index 99a7f606c7..2c87021c56 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java
@@ -36,8 +36,8 @@ import
org.apache.iotdb.db.mpp.execution.exchange.MPPDataExchangeService;
import org.apache.iotdb.db.mpp.plan.analyze.Analysis;
import org.apache.iotdb.db.mpp.plan.analyze.Analyzer;
import org.apache.iotdb.db.mpp.plan.analyze.IPartitionFetcher;
-import org.apache.iotdb.db.mpp.plan.analyze.ISchemaFetcher;
import org.apache.iotdb.db.mpp.plan.analyze.QueryType;
+import org.apache.iotdb.db.mpp.plan.analyze.schema.ISchemaFetcher;
import org.apache.iotdb.db.mpp.plan.execution.memory.MemorySourceHandle;
import org.apache.iotdb.db.mpp.plan.execution.memory.StatementMemorySource;
import
org.apache.iotdb.db.mpp.plan.execution.memory.StatementMemorySourceContext;
diff --git
a/server/src/main/java/org/apache/iotdb/db/protocol/mqtt/MPPPublishHandler.java
b/server/src/main/java/org/apache/iotdb/db/protocol/mqtt/MPPPublishHandler.java
index a55522c185..53d5241ef2 100644
---
a/server/src/main/java/org/apache/iotdb/db/protocol/mqtt/MPPPublishHandler.java
+++
b/server/src/main/java/org/apache/iotdb/db/protocol/mqtt/MPPPublishHandler.java
@@ -25,9 +25,9 @@ import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.mpp.plan.Coordinator;
import org.apache.iotdb.db.mpp.plan.analyze.ClusterPartitionFetcher;
-import org.apache.iotdb.db.mpp.plan.analyze.ClusterSchemaFetcher;
import org.apache.iotdb.db.mpp.plan.analyze.IPartitionFetcher;
-import org.apache.iotdb.db.mpp.plan.analyze.ISchemaFetcher;
+import org.apache.iotdb.db.mpp.plan.analyze.schema.ClusterSchemaFetcher;
+import org.apache.iotdb.db.mpp.plan.analyze.schema.ISchemaFetcher;
import org.apache.iotdb.db.mpp.plan.execution.ExecutionResult;
import org.apache.iotdb.db.mpp.plan.statement.crud.InsertRowStatement;
import org.apache.iotdb.db.query.control.SessionManager;
diff --git
a/server/src/main/java/org/apache/iotdb/db/protocol/rest/impl/GrafanaApiServiceImpl.java
b/server/src/main/java/org/apache/iotdb/db/protocol/rest/impl/GrafanaApiServiceImpl.java
index c4575725bc..f5e1c31de4 100644
---
a/server/src/main/java/org/apache/iotdb/db/protocol/rest/impl/GrafanaApiServiceImpl.java
+++
b/server/src/main/java/org/apache/iotdb/db/protocol/rest/impl/GrafanaApiServiceImpl.java
@@ -22,9 +22,9 @@ import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.mpp.plan.Coordinator;
import org.apache.iotdb.db.mpp.plan.analyze.ClusterPartitionFetcher;
-import org.apache.iotdb.db.mpp.plan.analyze.ClusterSchemaFetcher;
import org.apache.iotdb.db.mpp.plan.analyze.IPartitionFetcher;
-import org.apache.iotdb.db.mpp.plan.analyze.ISchemaFetcher;
+import org.apache.iotdb.db.mpp.plan.analyze.schema.ClusterSchemaFetcher;
+import org.apache.iotdb.db.mpp.plan.analyze.schema.ISchemaFetcher;
import org.apache.iotdb.db.mpp.plan.execution.ExecutionResult;
import org.apache.iotdb.db.mpp.plan.execution.IQueryExecution;
import org.apache.iotdb.db.mpp.plan.parser.StatementGenerator;
diff --git
a/server/src/main/java/org/apache/iotdb/db/protocol/rest/impl/RestApiServiceImpl.java
b/server/src/main/java/org/apache/iotdb/db/protocol/rest/impl/RestApiServiceImpl.java
index 0eeea0af1d..2f00f2e5dd 100644
---
a/server/src/main/java/org/apache/iotdb/db/protocol/rest/impl/RestApiServiceImpl.java
+++
b/server/src/main/java/org/apache/iotdb/db/protocol/rest/impl/RestApiServiceImpl.java
@@ -22,9 +22,9 @@ import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.conf.rest.IoTDBRestServiceDescriptor;
import org.apache.iotdb.db.mpp.plan.Coordinator;
import org.apache.iotdb.db.mpp.plan.analyze.ClusterPartitionFetcher;
-import org.apache.iotdb.db.mpp.plan.analyze.ClusterSchemaFetcher;
import org.apache.iotdb.db.mpp.plan.analyze.IPartitionFetcher;
-import org.apache.iotdb.db.mpp.plan.analyze.ISchemaFetcher;
+import org.apache.iotdb.db.mpp.plan.analyze.schema.ClusterSchemaFetcher;
+import org.apache.iotdb.db.mpp.plan.analyze.schema.ISchemaFetcher;
import org.apache.iotdb.db.mpp.plan.execution.ExecutionResult;
import org.apache.iotdb.db.mpp.plan.execution.IQueryExecution;
import org.apache.iotdb.db.mpp.plan.parser.StatementGenerator;
diff --git
a/server/src/main/java/org/apache/iotdb/db/service/metrics/IoTDBInternalReporter.java
b/server/src/main/java/org/apache/iotdb/db/service/metrics/IoTDBInternalReporter.java
index 337a3ec65d..caf253c1eb 100644
---
a/server/src/main/java/org/apache/iotdb/db/service/metrics/IoTDBInternalReporter.java
+++
b/server/src/main/java/org/apache/iotdb/db/service/metrics/IoTDBInternalReporter.java
@@ -27,9 +27,9 @@ import
org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.mpp.common.SessionInfo;
import org.apache.iotdb.db.mpp.plan.Coordinator;
import org.apache.iotdb.db.mpp.plan.analyze.ClusterPartitionFetcher;
-import org.apache.iotdb.db.mpp.plan.analyze.ClusterSchemaFetcher;
import org.apache.iotdb.db.mpp.plan.analyze.IPartitionFetcher;
-import org.apache.iotdb.db.mpp.plan.analyze.ISchemaFetcher;
+import org.apache.iotdb.db.mpp.plan.analyze.schema.ClusterSchemaFetcher;
+import org.apache.iotdb.db.mpp.plan.analyze.schema.ISchemaFetcher;
import org.apache.iotdb.db.mpp.plan.execution.ExecutionResult;
import org.apache.iotdb.db.mpp.plan.parser.StatementGenerator;
import org.apache.iotdb.db.mpp.plan.statement.Statement;
diff --git
a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/ClientRPCServiceImpl.java
b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/ClientRPCServiceImpl.java
index 49eb8abdff..29d07a462d 100644
---
a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/ClientRPCServiceImpl.java
+++
b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/ClientRPCServiceImpl.java
@@ -35,9 +35,9 @@ import
org.apache.iotdb.db.metadata.template.TemplateQueryType;
import org.apache.iotdb.db.mpp.common.header.DatasetHeader;
import org.apache.iotdb.db.mpp.plan.Coordinator;
import org.apache.iotdb.db.mpp.plan.analyze.ClusterPartitionFetcher;
-import org.apache.iotdb.db.mpp.plan.analyze.ClusterSchemaFetcher;
import org.apache.iotdb.db.mpp.plan.analyze.IPartitionFetcher;
-import org.apache.iotdb.db.mpp.plan.analyze.ISchemaFetcher;
+import org.apache.iotdb.db.mpp.plan.analyze.schema.ClusterSchemaFetcher;
+import org.apache.iotdb.db.mpp.plan.analyze.schema.ISchemaFetcher;
import org.apache.iotdb.db.mpp.plan.execution.ExecutionResult;
import org.apache.iotdb.db.mpp.plan.execution.IQueryExecution;
import org.apache.iotdb.db.mpp.plan.parser.StatementGenerator;
diff --git
a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java
b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java
index 40c9ef556a..75f0bc0421 100644
---
a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java
+++
b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java
@@ -76,9 +76,9 @@ import
org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceManager;
import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceState;
import org.apache.iotdb.db.mpp.plan.Coordinator;
import org.apache.iotdb.db.mpp.plan.analyze.ClusterPartitionFetcher;
-import org.apache.iotdb.db.mpp.plan.analyze.ClusterSchemaFetcher;
import org.apache.iotdb.db.mpp.plan.analyze.IPartitionFetcher;
-import org.apache.iotdb.db.mpp.plan.analyze.ISchemaFetcher;
+import org.apache.iotdb.db.mpp.plan.analyze.schema.ClusterSchemaFetcher;
+import org.apache.iotdb.db.mpp.plan.analyze.schema.ISchemaFetcher;
import org.apache.iotdb.db.mpp.plan.execution.ExecutionResult;
import org.apache.iotdb.db.mpp.plan.execution.IQueryExecution;
import org.apache.iotdb.db.mpp.plan.expression.Expression;
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/SyncService.java
b/server/src/main/java/org/apache/iotdb/db/sync/SyncService.java
index ccc95932b5..d5df9a8a73 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/SyncService.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/SyncService.java
@@ -39,7 +39,7 @@ import org.apache.iotdb.confignode.rpc.thrift.TShowPipeInfo;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.mpp.plan.analyze.IPartitionFetcher;
-import org.apache.iotdb.db.mpp.plan.analyze.ISchemaFetcher;
+import org.apache.iotdb.db.mpp.plan.analyze.schema.ISchemaFetcher;
import org.apache.iotdb.db.mpp.plan.statement.sys.sync.CreatePipeSinkStatement;
import org.apache.iotdb.db.qp.utils.DateTimeUtils;
import org.apache.iotdb.db.sync.common.ClusterSyncInfoFetcher;
diff --git
a/server/src/main/java/org/apache/iotdb/db/sync/pipedata/load/ILoader.java
b/server/src/main/java/org/apache/iotdb/db/sync/pipedata/load/ILoader.java
index 5936207a55..05ffb9c7df 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/pipedata/load/ILoader.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/pipedata/load/ILoader.java
@@ -20,9 +20,9 @@ package org.apache.iotdb.db.sync.pipedata.load;
import org.apache.iotdb.commons.exception.sync.PipeDataLoadException;
import org.apache.iotdb.db.mpp.plan.analyze.ClusterPartitionFetcher;
-import org.apache.iotdb.db.mpp.plan.analyze.ClusterSchemaFetcher;
import org.apache.iotdb.db.mpp.plan.analyze.IPartitionFetcher;
-import org.apache.iotdb.db.mpp.plan.analyze.ISchemaFetcher;
+import org.apache.iotdb.db.mpp.plan.analyze.schema.ClusterSchemaFetcher;
+import org.apache.iotdb.db.mpp.plan.analyze.schema.ISchemaFetcher;
/**
* This interface is used to load files, including tsFile, syncTask, schema,
modsFile and
diff --git
a/server/src/main/java/org/apache/iotdb/db/sync/transport/server/ReceiverManager.java
b/server/src/main/java/org/apache/iotdb/db/sync/transport/server/ReceiverManager.java
index d0c348fda1..a8ffdab2d3 100644
---
a/server/src/main/java/org/apache/iotdb/db/sync/transport/server/ReceiverManager.java
+++
b/server/src/main/java/org/apache/iotdb/db/sync/transport/server/ReceiverManager.java
@@ -30,7 +30,7 @@ import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.mpp.plan.Coordinator;
import org.apache.iotdb.db.mpp.plan.analyze.IPartitionFetcher;
-import org.apache.iotdb.db.mpp.plan.analyze.ISchemaFetcher;
+import org.apache.iotdb.db.mpp.plan.analyze.schema.ISchemaFetcher;
import org.apache.iotdb.db.mpp.plan.execution.ExecutionResult;
import
org.apache.iotdb.db.mpp.plan.statement.metadata.SetStorageGroupStatement;
import org.apache.iotdb.db.query.control.SessionManager;
diff --git
a/server/src/main/java/org/apache/iotdb/db/wal/recover/file/TsFilePlanRedoer.java
b/server/src/main/java/org/apache/iotdb/db/wal/recover/file/TsFilePlanRedoer.java
index f468e09d9b..89430bc91f 100644
---
a/server/src/main/java/org/apache/iotdb/db/wal/recover/file/TsFilePlanRedoer.java
+++
b/server/src/main/java/org/apache/iotdb/db/wal/recover/file/TsFilePlanRedoer.java
@@ -27,7 +27,7 @@ import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
import org.apache.iotdb.db.exception.WriteProcessException;
import org.apache.iotdb.db.metadata.idtable.IDTable;
import org.apache.iotdb.db.metadata.idtable.entry.DeviceIDFactory;
-import org.apache.iotdb.db.mpp.plan.analyze.SchemaValidator;
+import org.apache.iotdb.db.mpp.plan.analyze.schema.SchemaValidator;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.DeleteDataNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertRowNode;
diff --git
a/server/src/test/java/org/apache/iotdb/db/mpp/plan/analyze/FakeSchemaFetcherImpl.java
b/server/src/test/java/org/apache/iotdb/db/mpp/plan/analyze/FakeSchemaFetcherImpl.java
index 61951feee4..a65b83d03a 100644
---
a/server/src/test/java/org/apache/iotdb/db/mpp/plan/analyze/FakeSchemaFetcherImpl.java
+++
b/server/src/test/java/org/apache/iotdb/db/mpp/plan/analyze/FakeSchemaFetcherImpl.java
@@ -28,6 +28,7 @@ import
org.apache.iotdb.db.mpp.common.schematree.node.SchemaEntityNode;
import org.apache.iotdb.db.mpp.common.schematree.node.SchemaInternalNode;
import org.apache.iotdb.db.mpp.common.schematree.node.SchemaMeasurementNode;
import org.apache.iotdb.db.mpp.common.schematree.node.SchemaNode;
+import org.apache.iotdb.db.mpp.plan.analyze.schema.ISchemaFetcher;
import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
diff --git
a/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/distribution/Util.java
b/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/distribution/Util.java
index a4a3c6c580..7c908b2f4d 100644
---
a/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/distribution/Util.java
+++
b/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/distribution/Util.java
@@ -46,7 +46,7 @@ import
org.apache.iotdb.db.mpp.common.schematree.node.SchemaNode;
import org.apache.iotdb.db.mpp.plan.analyze.Analysis;
import org.apache.iotdb.db.mpp.plan.analyze.Analyzer;
import org.apache.iotdb.db.mpp.plan.analyze.IPartitionFetcher;
-import org.apache.iotdb.db.mpp.plan.analyze.ISchemaFetcher;
+import org.apache.iotdb.db.mpp.plan.analyze.schema.ISchemaFetcher;
import org.apache.iotdb.db.mpp.plan.expression.Expression;
import org.apache.iotdb.db.mpp.plan.expression.leaf.TimeSeriesOperand;
import org.apache.iotdb.db.mpp.plan.parser.StatementGenerator;