This is an automated email from the ASF dual-hosted git repository.

zyk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 0e1dd1b8a2 Refine the inner structure of ClusterSchemaFetcher (#8603)
0e1dd1b8a2 is described below

commit 0e1dd1b8a20e5b93a36effce5f46d60be89f16b7
Author: Marcos_Zyk <[email protected]>
AuthorDate: Sat Dec 24 14:36:14 2022 +0800

    Refine the inner structure of ClusterSchemaFetcher (#8603)
    
    Refine the inner structure of ClusterSchemaFetcher (#8603)
---
 .../iotdb/db/client/DataNodeInternalClient.java    |   4 +-
 .../db/metadata/cache/DataNodeSchemaCache.java     |   4 -
 .../execution/executor/RegionWriteExecutor.java    |   2 +-
 .../org/apache/iotdb/db/mpp/plan/Coordinator.java  |   2 +-
 .../iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java  |   2 +
 .../apache/iotdb/db/mpp/plan/analyze/Analyzer.java |   2 +
 .../db/mpp/plan/analyze/ClusterSchemaFetcher.java  | 599 ---------------------
 .../analyze/schema/AutoCreateSchemaExecutor.java   | 230 ++++++++
 .../analyze/schema/ClusterSchemaFetchExecutor.java | 207 +++++++
 .../plan/analyze/schema/ClusterSchemaFetcher.java  | 354 ++++++++++++
 .../plan/analyze/{ => schema}/ISchemaFetcher.java  |   2 +-
 .../plan/analyze/{ => schema}/SchemaValidator.java |   2 +-
 .../db/mpp/plan/execution/QueryExecution.java      |   2 +-
 .../iotdb/db/protocol/mqtt/MPPPublishHandler.java  |   4 +-
 .../protocol/rest/impl/GrafanaApiServiceImpl.java  |   4 +-
 .../db/protocol/rest/impl/RestApiServiceImpl.java  |   4 +-
 .../db/service/metrics/IoTDBInternalReporter.java  |   4 +-
 .../service/thrift/impl/ClientRPCServiceImpl.java  |   4 +-
 .../impl/DataNodeInternalRPCServiceImpl.java       |   4 +-
 .../java/org/apache/iotdb/db/sync/SyncService.java |   2 +-
 .../iotdb/db/sync/pipedata/load/ILoader.java       |   4 +-
 .../db/sync/transport/server/ReceiverManager.java  |   2 +-
 .../db/wal/recover/file/TsFilePlanRedoer.java      |   2 +-
 .../db/mpp/plan/analyze/FakeSchemaFetcherImpl.java |   1 +
 .../iotdb/db/mpp/plan/plan/distribution/Util.java  |   2 +-
 25 files changed, 821 insertions(+), 628 deletions(-)

diff --git 
a/server/src/main/java/org/apache/iotdb/db/client/DataNodeInternalClient.java 
b/server/src/main/java/org/apache/iotdb/db/client/DataNodeInternalClient.java
index 148cce158a..497fd2aa38 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/client/DataNodeInternalClient.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/client/DataNodeInternalClient.java
@@ -27,9 +27,9 @@ import org.apache.iotdb.db.exception.IntoProcessException;
 import org.apache.iotdb.db.mpp.common.SessionInfo;
 import org.apache.iotdb.db.mpp.plan.Coordinator;
 import org.apache.iotdb.db.mpp.plan.analyze.ClusterPartitionFetcher;
-import org.apache.iotdb.db.mpp.plan.analyze.ClusterSchemaFetcher;
 import org.apache.iotdb.db.mpp.plan.analyze.IPartitionFetcher;
-import org.apache.iotdb.db.mpp.plan.analyze.ISchemaFetcher;
+import org.apache.iotdb.db.mpp.plan.analyze.schema.ClusterSchemaFetcher;
+import org.apache.iotdb.db.mpp.plan.analyze.schema.ISchemaFetcher;
 import org.apache.iotdb.db.mpp.plan.execution.ExecutionResult;
 import org.apache.iotdb.db.mpp.plan.statement.crud.InsertMultiTabletsStatement;
 import org.apache.iotdb.db.query.control.SessionManager;
diff --git 
a/server/src/main/java/org/apache/iotdb/db/metadata/cache/DataNodeSchemaCache.java
 
b/server/src/main/java/org/apache/iotdb/db/metadata/cache/DataNodeSchemaCache.java
index c8a0f68b0b..01f6305806 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/metadata/cache/DataNodeSchemaCache.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/metadata/cache/DataNodeSchemaCache.java
@@ -142,10 +142,6 @@ public class DataNodeSchemaCache {
     }
   }
 
-  public void put(String storageGroup, MeasurementPath measurementPath) {
-    putSingleMeasurementPath(storageGroup, measurementPath);
-  }
-
   private void putSingleMeasurementPath(String storageGroup, MeasurementPath 
measurementPath) {
     SchemaCacheEntry schemaCacheEntry =
         new SchemaCacheEntry(
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/executor/RegionWriteExecutor.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/executor/RegionWriteExecutor.java
index c1f46626e7..73df88fd94 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/executor/RegionWriteExecutor.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/executor/RegionWriteExecutor.java
@@ -39,7 +39,7 @@ import 
org.apache.iotdb.db.metadata.schemaregion.ISchemaRegion;
 import org.apache.iotdb.db.metadata.schemaregion.SchemaEngine;
 import org.apache.iotdb.db.metadata.template.ClusterTemplateManager;
 import org.apache.iotdb.db.metadata.template.Template;
-import org.apache.iotdb.db.mpp.plan.analyze.SchemaValidator;
+import org.apache.iotdb.db.mpp.plan.analyze.schema.SchemaValidator;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanVisitor;
 import 
org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.write.ActivateTemplateNode;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/Coordinator.java 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/Coordinator.java
index 3a3002850e..4940a4f538 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/Coordinator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/Coordinator.java
@@ -31,7 +31,7 @@ import org.apache.iotdb.db.mpp.common.QueryId;
 import org.apache.iotdb.db.mpp.common.SessionInfo;
 import org.apache.iotdb.db.mpp.execution.QueryIdGenerator;
 import org.apache.iotdb.db.mpp.plan.analyze.IPartitionFetcher;
-import org.apache.iotdb.db.mpp.plan.analyze.ISchemaFetcher;
+import org.apache.iotdb.db.mpp.plan.analyze.schema.ISchemaFetcher;
 import org.apache.iotdb.db.mpp.plan.constant.DataNodeEndPoints;
 import org.apache.iotdb.db.mpp.plan.execution.ExecutionResult;
 import org.apache.iotdb.db.mpp.plan.execution.IQueryExecution;
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java
index 4d5d858b48..13213da4fa 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java
@@ -53,6 +53,8 @@ import 
org.apache.iotdb.db.mpp.common.header.DatasetHeaderFactory;
 import org.apache.iotdb.db.mpp.common.schematree.DeviceSchemaInfo;
 import org.apache.iotdb.db.mpp.common.schematree.ISchemaTree;
 import org.apache.iotdb.db.mpp.plan.Coordinator;
+import org.apache.iotdb.db.mpp.plan.analyze.schema.ISchemaFetcher;
+import org.apache.iotdb.db.mpp.plan.analyze.schema.SchemaValidator;
 import org.apache.iotdb.db.mpp.plan.execution.ExecutionResult;
 import org.apache.iotdb.db.mpp.plan.expression.Expression;
 import org.apache.iotdb.db.mpp.plan.expression.ExpressionType;
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/Analyzer.java 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/Analyzer.java
index 83f0378323..851069dc3f 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/Analyzer.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/Analyzer.java
@@ -20,6 +20,8 @@
 package org.apache.iotdb.db.mpp.plan.analyze;
 
 import org.apache.iotdb.db.mpp.common.MPPQueryContext;
+import org.apache.iotdb.db.mpp.plan.analyze.schema.ClusterSchemaFetcher;
+import org.apache.iotdb.db.mpp.plan.analyze.schema.ISchemaFetcher;
 import org.apache.iotdb.db.mpp.plan.statement.Statement;
 
 import static org.apache.iotdb.db.mpp.common.QueryId.mockQueryId;
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/ClusterSchemaFetcher.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/ClusterSchemaFetcher.java
deleted file mode 100644
index 16a49ae47a..0000000000
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/ClusterSchemaFetcher.java
+++ /dev/null
@@ -1,599 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.iotdb.db.mpp.plan.analyze;
-
-import org.apache.iotdb.common.rpc.thrift.TSStatus;
-import org.apache.iotdb.commons.exception.IoTDBException;
-import org.apache.iotdb.commons.exception.MetadataException;
-import org.apache.iotdb.commons.path.MeasurementPath;
-import org.apache.iotdb.commons.path.PartialPath;
-import org.apache.iotdb.commons.path.PathPatternTree;
-import org.apache.iotdb.db.conf.IoTDBConfig;
-import org.apache.iotdb.db.conf.IoTDBDescriptor;
-import org.apache.iotdb.db.exception.sql.SemanticException;
-import org.apache.iotdb.db.metadata.cache.DataNodeSchemaCache;
-import org.apache.iotdb.db.metadata.template.ClusterTemplateManager;
-import org.apache.iotdb.db.metadata.template.ITemplateManager;
-import org.apache.iotdb.db.metadata.template.Template;
-import org.apache.iotdb.db.mpp.common.schematree.ClusterSchemaTree;
-import org.apache.iotdb.db.mpp.common.schematree.DeviceSchemaInfo;
-import org.apache.iotdb.db.mpp.common.schematree.ISchemaTree;
-import org.apache.iotdb.db.mpp.plan.Coordinator;
-import org.apache.iotdb.db.mpp.plan.execution.ExecutionResult;
-import org.apache.iotdb.db.mpp.plan.statement.Statement;
-import 
org.apache.iotdb.db.mpp.plan.statement.internal.InternalCreateTimeSeriesStatement;
-import org.apache.iotdb.db.mpp.plan.statement.internal.SchemaFetchStatement;
-import 
org.apache.iotdb.db.mpp.plan.statement.metadata.template.ActivateTemplateStatement;
-import org.apache.iotdb.db.query.control.SessionManager;
-import org.apache.iotdb.db.utils.SetThreadName;
-import org.apache.iotdb.rpc.TSStatusCode;
-import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
-import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
-import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
-import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
-import org.apache.iotdb.tsfile.read.common.block.TsBlock;
-import org.apache.iotdb.tsfile.read.common.block.column.Column;
-import org.apache.iotdb.tsfile.utils.Binary;
-import org.apache.iotdb.tsfile.utils.Pair;
-import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
-import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
-import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
-
-import java.io.ByteArrayInputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
-import java.util.Set;
-import java.util.function.Function;
-import java.util.stream.Collectors;
-import java.util.stream.IntStream;
-
-import static 
org.apache.iotdb.db.utils.EncodingInferenceUtils.getDefaultEncoding;
-
-public class ClusterSchemaFetcher implements ISchemaFetcher {
-
-  private final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
-
-  private final Coordinator coordinator = Coordinator.getInstance();
-  private final DataNodeSchemaCache schemaCache = 
DataNodeSchemaCache.getInstance();
-  private final ITemplateManager templateManager = 
ClusterTemplateManager.getInstance();
-
-  private static final class ClusterSchemaFetcherHolder {
-    private static final ClusterSchemaFetcher INSTANCE = new 
ClusterSchemaFetcher();
-
-    private ClusterSchemaFetcherHolder() {}
-  }
-
-  public static ClusterSchemaFetcher getInstance() {
-    return ClusterSchemaFetcherHolder.INSTANCE;
-  }
-
-  private ClusterSchemaFetcher() {}
-
-  @Override
-  public ClusterSchemaTree fetchSchema(PathPatternTree patternTree) {
-    return fetchSchema(patternTree, false);
-  }
-
-  @Override
-  public ClusterSchemaTree fetchSchemaWithTags(PathPatternTree patternTree) {
-    return fetchSchema(patternTree, true);
-  }
-
-  private ClusterSchemaTree fetchSchema(PathPatternTree patternTree, boolean 
withTags) {
-    Map<Integer, Template> templateMap = new HashMap<>();
-    patternTree.constructTree();
-    List<PartialPath> pathPatternList = patternTree.getAllPathPatterns();
-    for (PartialPath pattern : pathPatternList) {
-      templateMap.putAll(templateManager.checkAllRelatedTemplate(pattern));
-    }
-
-    if (withTags) {
-      return executeSchemaFetchQuery(new SchemaFetchStatement(patternTree, 
templateMap, withTags));
-    }
-
-    List<PartialPath> fullPathList = new ArrayList<>();
-    for (PartialPath pattern : pathPatternList) {
-      if (!pattern.hasWildcard()) {
-        fullPathList.add(pattern);
-      }
-    }
-
-    if (fullPathList.isEmpty()) {
-      return executeSchemaFetchQuery(new SchemaFetchStatement(patternTree, 
templateMap, withTags));
-    }
-
-    // The schema cache R/W and fetch operation must be locked together thus 
the cache clean
-    // operation executed by delete timeseries will be effective.
-    schemaCache.takeReadLock();
-    try {
-      ClusterSchemaTree schemaTree;
-      if (fullPathList.size() == pathPatternList.size()) {
-        boolean isAllCached = true;
-        schemaTree = new ClusterSchemaTree();
-        ClusterSchemaTree cachedSchema;
-        Set<String> storageGroupSet = new HashSet<>();
-        for (PartialPath fullPath : fullPathList) {
-          cachedSchema = schemaCache.get(fullPath);
-          if (cachedSchema.isEmpty()) {
-            isAllCached = false;
-            break;
-          } else {
-            schemaTree.mergeSchemaTree(cachedSchema);
-            storageGroupSet.addAll(cachedSchema.getDatabases());
-          }
-        }
-        if (isAllCached) {
-          schemaTree.setDatabases(storageGroupSet);
-          return schemaTree;
-        }
-      }
-
-      schemaTree =
-          executeSchemaFetchQuery(new SchemaFetchStatement(patternTree, 
templateMap, withTags));
-
-      // only cache the schema fetched by full path
-      List<MeasurementPath> measurementPathList;
-      for (PartialPath fullPath : fullPathList) {
-        measurementPathList = schemaTree.searchMeasurementPaths(fullPath).left;
-        if (measurementPathList.isEmpty()) {
-          continue;
-        }
-        schemaCache.put(
-            schemaTree.getBelongedDatabase(measurementPathList.get(0)), 
measurementPathList.get(0));
-      }
-      return schemaTree;
-    } finally {
-      schemaCache.releaseReadLock();
-    }
-  }
-
-  private ClusterSchemaTree executeSchemaFetchQuery(SchemaFetchStatement 
schemaFetchStatement) {
-    long queryId = SessionManager.getInstance().requestQueryId();
-    try {
-      ExecutionResult executionResult =
-          coordinator.execute(
-              schemaFetchStatement,
-              queryId,
-              null,
-              "",
-              ClusterPartitionFetcher.getInstance(),
-              this,
-              config.getQueryTimeoutThreshold());
-      if (executionResult.status.getCode() != 
TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
-        throw new RuntimeException(
-            String.format(
-                "cannot fetch schema, status is: %s, msg is: %s",
-                executionResult.status.getCode(), 
executionResult.status.getMessage()));
-      }
-      try (SetThreadName threadName = new 
SetThreadName(executionResult.queryId.getId())) {
-        ClusterSchemaTree result = new ClusterSchemaTree();
-        Set<String> databaseSet = new HashSet<>();
-        while (coordinator.getQueryExecution(queryId).hasNextResult()) {
-          // The query will be transited to FINISHED when invoking 
getBatchResult() at the last time
-          // So we don't need to clean up it manually
-          Optional<TsBlock> tsBlock;
-          try {
-            tsBlock = coordinator.getQueryExecution(queryId).getBatchResult();
-          } catch (IoTDBException e) {
-            throw new RuntimeException("Fetch Schema failed. ", e);
-          }
-          if (!tsBlock.isPresent() || tsBlock.get().isEmpty()) {
-            break;
-          }
-          Column column = tsBlock.get().getColumn(0);
-          for (int i = 0; i < column.getPositionCount(); i++) {
-            parseFetchedData(column.getBinary(i), result, databaseSet);
-          }
-        }
-        result.setDatabases(databaseSet);
-        return result;
-      }
-    } finally {
-      coordinator.cleanupQueryExecution(queryId);
-    }
-  }
-
-  private void parseFetchedData(
-      Binary data, ClusterSchemaTree resultSchemaTree, Set<String> 
databaseSet) {
-    InputStream inputStream = new ByteArrayInputStream(data.getValues());
-    try {
-      byte type = ReadWriteIOUtils.readByte(inputStream);
-      if (type == 0) {
-        int size = ReadWriteIOUtils.readInt(inputStream);
-        for (int i = 0; i < size; i++) {
-          databaseSet.add(ReadWriteIOUtils.readString(inputStream));
-        }
-      } else if (type == 1) {
-        
resultSchemaTree.mergeSchemaTree(ClusterSchemaTree.deserialize(inputStream));
-      } else {
-        throw new RuntimeException(
-            new MetadataException("Failed to fetch schema because of 
unrecognized data"));
-      }
-    } catch (IOException e) {
-      // Totally memory operation. This case won't happen.
-    }
-  }
-
-  @Override
-  public ISchemaTree fetchSchemaWithAutoCreate(
-      PartialPath devicePath,
-      String[] measurements,
-      Function<Integer, TSDataType> getDataType,
-      boolean isAligned) {
-    // The schema cache R/W and fetch operation must be locked together thus 
the cache clean
-    // operation executed by delete timeseries will be effective.
-    schemaCache.takeReadLock();
-    try {
-      ClusterSchemaTree schemaTree = schemaCache.get(devicePath, measurements);
-      List<Integer> indexOfMissingMeasurements =
-          checkMissingMeasurements(schemaTree, devicePath, measurements);
-
-      // all schema can be taken from cache
-      if (indexOfMissingMeasurements.isEmpty()) {
-        return schemaTree;
-      }
-
-      // try fetch the missing schema from remote and cache fetched schema
-      PathPatternTree patternTree = new PathPatternTree();
-      for (int index : indexOfMissingMeasurements) {
-        patternTree.appendFullPath(devicePath, measurements[index]);
-      }
-      ClusterSchemaTree remoteSchemaTree = fetchSchema(patternTree);
-      if (!remoteSchemaTree.isEmpty()) {
-        schemaTree.mergeSchemaTree(remoteSchemaTree);
-        schemaCache.put(remoteSchemaTree);
-      }
-
-      if (!config.isAutoCreateSchemaEnabled()) {
-        return schemaTree;
-      }
-
-      // auto create the still missing schema and merge them into schemaTree
-      checkAndAutoCreateMissingMeasurements(
-          schemaTree,
-          devicePath,
-          indexOfMissingMeasurements,
-          measurements,
-          getDataType,
-          null,
-          null,
-          isAligned);
-
-      return schemaTree;
-    } finally {
-      schemaCache.releaseReadLock();
-    }
-  }
-
-  @Override
-  public ISchemaTree fetchSchemaListWithAutoCreate(
-      List<PartialPath> devicePathList,
-      List<String[]> measurementsList,
-      List<TSDataType[]> tsDataTypesList,
-      List<Boolean> isAlignedList) {
-    return fetchSchemaListWithAutoCreate(
-        devicePathList, measurementsList, tsDataTypesList, null, null, 
isAlignedList);
-  }
-
-  @Override
-  public ISchemaTree fetchSchemaListWithAutoCreate(
-      List<PartialPath> devicePathList,
-      List<String[]> measurementsList,
-      List<TSDataType[]> tsDataTypesList,
-      List<TSEncoding[]> encodingsList,
-      List<CompressionType[]> compressionTypesList,
-      List<Boolean> isAlignedList) {
-    // The schema cache R/W and fetch operation must be locked together thus 
the cache clean
-    // operation executed by delete timeseries will be effective.
-    schemaCache.takeReadLock();
-    try {
-      ClusterSchemaTree schemaTree = new ClusterSchemaTree();
-      PathPatternTree patternTree = new PathPatternTree();
-      List<List<Integer>> indexOfMissingMeasurementsList = new 
ArrayList<>(devicePathList.size());
-      for (int i = 0; i < devicePathList.size(); i++) {
-        schemaTree.mergeSchemaTree(schemaCache.get(devicePathList.get(i), 
measurementsList.get(i)));
-        List<Integer> indexOfMissingMeasurements =
-            checkMissingMeasurements(schemaTree, devicePathList.get(i), 
measurementsList.get(i));
-        indexOfMissingMeasurementsList.add(indexOfMissingMeasurements);
-        for (int index : indexOfMissingMeasurements) {
-          patternTree.appendFullPath(devicePathList.get(i), 
measurementsList.get(i)[index]);
-        }
-      }
-
-      // all schema can be taken from cache
-      if (patternTree.isEmpty()) {
-        return schemaTree;
-      }
-
-      // try fetch the missing schema from remote and cache fetched schema
-      ClusterSchemaTree remoteSchemaTree = fetchSchema(patternTree);
-      if (!remoteSchemaTree.isEmpty()) {
-        schemaTree.mergeSchemaTree(remoteSchemaTree);
-        schemaCache.put(remoteSchemaTree);
-      }
-
-      if (!config.isAutoCreateSchemaEnabled()) {
-        return schemaTree;
-      }
-
-      // auto create the still missing schema and merge them into schemaTree
-      for (int i = 0; i < devicePathList.size(); i++) {
-        int finalI = i;
-        checkAndAutoCreateMissingMeasurements(
-            schemaTree,
-            devicePathList.get(i),
-            indexOfMissingMeasurementsList.get(i),
-            measurementsList.get(i),
-            index -> tsDataTypesList.get(finalI)[index],
-            encodingsList == null ? null : encodingsList.get(i),
-            compressionTypesList == null ? null : compressionTypesList.get(i),
-            isAlignedList.get(i));
-      }
-      return schemaTree;
-    } finally {
-      schemaCache.releaseReadLock();
-    }
-  }
-
-  @Override
-  public Pair<Template, PartialPath> checkTemplateSetInfo(PartialPath path) {
-    return templateManager.checkTemplateSetInfo(path);
-  }
-
-  @Override
-  public Map<Integer, Template> checkAllRelatedTemplate(PartialPath 
pathPattern) {
-    return templateManager.checkAllRelatedTemplate(pathPattern);
-  }
-
-  @Override
-  public Pair<Template, List<PartialPath>> getAllPathsSetTemplate(String 
templateName) {
-    return templateManager.getAllPathsSetTemplate(templateName);
-  }
-
-  // check which measurements are missing and auto create the missing 
measurements and merge them
-  // into given schemaTree
-  private void checkAndAutoCreateMissingMeasurements(
-      ClusterSchemaTree schemaTree,
-      PartialPath devicePath,
-      List<Integer> indexOfMissingMeasurements,
-      String[] measurements,
-      Function<Integer, TSDataType> getDataType,
-      TSEncoding[] encodings,
-      CompressionType[] compressionTypes,
-      boolean isAligned) {
-    // check missing measurements
-    DeviceSchemaInfo deviceSchemaInfo =
-        schemaTree.searchDeviceSchemaInfo(
-            devicePath,
-            indexOfMissingMeasurements.stream()
-                .map(index -> measurements[index])
-                .collect(Collectors.toList()));
-    if (deviceSchemaInfo != null) {
-      List<MeasurementSchema> schemaList = 
deviceSchemaInfo.getMeasurementSchemaList();
-      int removedCount = 0;
-      for (int i = 0, size = schemaList.size(); i < size; i++) {
-        if (schemaList.get(i) != null) {
-          indexOfMissingMeasurements.remove(i - removedCount);
-          removedCount++;
-        }
-      }
-    }
-    if (indexOfMissingMeasurements.isEmpty()) {
-      return;
-    }
-
-    // check whether there is template should be activated
-    Pair<Template, PartialPath> templateInfo = 
templateManager.checkTemplateSetInfo(devicePath);
-    if (templateInfo != null) {
-      Template template = templateInfo.left;
-      boolean shouldActivateTemplate = false;
-      for (int index : indexOfMissingMeasurements) {
-        if (template.hasSchema(measurements[index])) {
-          shouldActivateTemplate = true;
-          break;
-        }
-      }
-
-      if (shouldActivateTemplate) {
-        internalActivateTemplate(devicePath);
-        List<Integer> recheckedIndexOfMissingMeasurements = new ArrayList<>();
-        for (int i = 0; i < indexOfMissingMeasurements.size(); i++) {
-          if (!template.hasSchema(measurements[i])) {
-            
recheckedIndexOfMissingMeasurements.add(indexOfMissingMeasurements.get(i));
-          }
-        }
-        indexOfMissingMeasurements = recheckedIndexOfMissingMeasurements;
-        for (Map.Entry<String, IMeasurementSchema> entry : 
template.getSchemaMap().entrySet()) {
-          schemaTree.appendSingleMeasurement(
-              devicePath.concatNode(entry.getKey()),
-              (MeasurementSchema) entry.getValue(),
-              null,
-              null,
-              template.isDirectAligned());
-        }
-
-        if (indexOfMissingMeasurements.isEmpty()) {
-          return;
-        }
-      }
-    }
-
-    // auto create the rest missing timeseries
-    List<String> missingMeasurements = new 
ArrayList<>(indexOfMissingMeasurements.size());
-    List<TSDataType> dataTypesOfMissingMeasurement =
-        new ArrayList<>(indexOfMissingMeasurements.size());
-    List<TSEncoding> encodingsOfMissingMeasurement =
-        new ArrayList<>(indexOfMissingMeasurements.size());
-    List<CompressionType> compressionTypesOfMissingMeasurement =
-        new ArrayList<>(indexOfMissingMeasurements.size());
-    indexOfMissingMeasurements.forEach(
-        index -> {
-          TSDataType tsDataType = getDataType.apply(index);
-          // tsDataType == null means insert null value to a non-exist series
-          // should skip creating them
-          if (tsDataType != null) {
-            missingMeasurements.add(measurements[index]);
-            dataTypesOfMissingMeasurement.add(tsDataType);
-            encodingsOfMissingMeasurement.add(
-                encodings == null ? getDefaultEncoding(tsDataType) : 
encodings[index]);
-            compressionTypesOfMissingMeasurement.add(
-                compressionTypes == null
-                    ? 
TSFileDescriptor.getInstance().getConfig().getCompressor()
-                    : compressionTypes[index]);
-          }
-        });
-
-    if (!missingMeasurements.isEmpty()) {
-      schemaTree.mergeSchemaTree(
-          internalCreateTimeseries(
-              devicePath,
-              missingMeasurements,
-              dataTypesOfMissingMeasurement,
-              encodingsOfMissingMeasurement,
-              compressionTypesOfMissingMeasurement,
-              isAligned));
-    }
-  }
-
-  private List<Integer> checkMissingMeasurements(
-      ISchemaTree schemaTree, PartialPath devicePath, String[] measurements) {
-    DeviceSchemaInfo deviceSchemaInfo =
-        schemaTree.searchDeviceSchemaInfo(devicePath, 
Arrays.asList(measurements));
-    if (deviceSchemaInfo == null) {
-      return IntStream.range(0, 
measurements.length).boxed().collect(Collectors.toList());
-    }
-
-    List<Integer> indexOfMissingMeasurements = new ArrayList<>();
-    List<MeasurementSchema> schemaList = 
deviceSchemaInfo.getMeasurementSchemaList();
-    for (int i = 0; i < measurements.length; i++) {
-      if (schemaList.get(i) == null) {
-        indexOfMissingMeasurements.add(i);
-      }
-    }
-
-    return indexOfMissingMeasurements;
-  }
-
-  // try to create the target timeseries and return schemaTree involving 
successfully created
-  // timeseries and existing timeseries
-  private ClusterSchemaTree internalCreateTimeseries(
-      PartialPath devicePath,
-      List<String> measurements,
-      List<TSDataType> tsDataTypes,
-      List<TSEncoding> encodings,
-      List<CompressionType> compressors,
-      boolean isAligned) {
-    List<MeasurementPath> measurementPathList =
-        executeInternalCreateTimeseriesStatement(
-            new InternalCreateTimeSeriesStatement(
-                devicePath, measurements, tsDataTypes, encodings, compressors, 
isAligned));
-
-    Set<Integer> alreadyExistingMeasurementIndexSet =
-        measurementPathList.stream()
-            .map(o -> measurements.indexOf(o.getMeasurement()))
-            .collect(Collectors.toSet());
-
-    ClusterSchemaTree schemaTree = new ClusterSchemaTree();
-    schemaTree.appendMeasurementPaths(measurementPathList);
-
-    for (int i = 0, size = measurements.size(); i < size; i++) {
-      if (alreadyExistingMeasurementIndexSet.contains(i)) {
-        continue;
-      }
-
-      schemaTree.appendSingleMeasurement(
-          devicePath.concatNode(measurements.get(i)),
-          new MeasurementSchema(
-              measurements.get(i), tsDataTypes.get(i), encodings.get(i), 
compressors.get(i)),
-          null,
-          null,
-          isAligned);
-    }
-
-    return schemaTree;
-  }
-
-  // auto create timeseries and return the existing timeseries info
-  private List<MeasurementPath> executeInternalCreateTimeseriesStatement(
-      InternalCreateTimeSeriesStatement statement) {
-
-    ExecutionResult executionResult = executeStatement(statement);
-
-    int statusCode = executionResult.status.getCode();
-    if (statusCode == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
-      return Collections.emptyList();
-    }
-
-    if (statusCode != TSStatusCode.MULTIPLE_ERROR.getStatusCode()) {
-      throw new RuntimeException(
-          new IoTDBException(executionResult.status.getMessage(), statusCode));
-    }
-
-    Set<String> failedCreationSet = new HashSet<>();
-    List<MeasurementPath> alreadyExistingMeasurements = new ArrayList<>();
-    for (TSStatus subStatus : executionResult.status.subStatus) {
-      if (subStatus.code == 
TSStatusCode.TIMESERIES_ALREADY_EXIST.getStatusCode()) {
-        alreadyExistingMeasurements.add(
-            MeasurementPath.parseDataFromString(subStatus.getMessage()));
-      } else {
-        failedCreationSet.add(subStatus.message);
-      }
-    }
-
-    if (!failedCreationSet.isEmpty()) {
-      throw new SemanticException(new MetadataException(String.join("; ", 
failedCreationSet)));
-    }
-
-    return alreadyExistingMeasurements;
-  }
-
-  public void internalActivateTemplate(PartialPath devicePath) {
-    ExecutionResult executionResult = executeStatement(new 
ActivateTemplateStatement(devicePath));
-    TSStatus status = executionResult.status;
-    if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()
-        && status.getCode() != 
TSStatusCode.TEMPLATE_IS_IN_USE.getStatusCode()) {
-      throw new RuntimeException(new IoTDBException(status.getMessage(), 
status.getCode()));
-    }
-  }
-
-  private ExecutionResult executeStatement(Statement statement) {
-    long queryId = SessionManager.getInstance().requestQueryId();
-    return coordinator.execute(
-        statement,
-        queryId,
-        null,
-        "",
-        ClusterPartitionFetcher.getInstance(),
-        this,
-        config.getQueryTimeoutThreshold());
-  }
-
-  @Override
-  public void invalidAllCache() {
-    DataNodeSchemaCache.getInstance().cleanUp();
-  }
-}
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/AutoCreateSchemaExecutor.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/AutoCreateSchemaExecutor.java
new file mode 100644
index 0000000000..638e0206e0
--- /dev/null
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/AutoCreateSchemaExecutor.java
@@ -0,0 +1,230 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.plan.analyze.schema;
+
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.exception.IoTDBException;
+import org.apache.iotdb.commons.exception.MetadataException;
+import org.apache.iotdb.commons.path.MeasurementPath;
+import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.db.exception.sql.SemanticException;
+import org.apache.iotdb.db.metadata.template.ITemplateManager;
+import org.apache.iotdb.db.metadata.template.Template;
+import org.apache.iotdb.db.mpp.common.schematree.ClusterSchemaTree;
+import org.apache.iotdb.db.mpp.plan.execution.ExecutionResult;
+import org.apache.iotdb.db.mpp.plan.statement.Statement;
+import 
org.apache.iotdb.db.mpp.plan.statement.internal.InternalCreateTimeSeriesStatement;
+import 
org.apache.iotdb.db.mpp.plan.statement.metadata.template.ActivateTemplateStatement;
+import org.apache.iotdb.rpc.TSStatusCode;
+import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
+import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
+import org.apache.iotdb.tsfile.utils.Pair;
+import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
+import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import static 
org.apache.iotdb.db.utils.EncodingInferenceUtils.getDefaultEncoding;
+
+class AutoCreateSchemaExecutor {
+
+  private final ITemplateManager templateManager;
+  private final Function<Statement, ExecutionResult> statementExecutor;
+
+  AutoCreateSchemaExecutor(
+      ITemplateManager templateManager, Function<Statement, ExecutionResult> 
statementExecutor) {
+    this.templateManager = templateManager;
+    this.statementExecutor = statementExecutor;
+  }
+
+  void autoCreateMissingMeasurements(
+      ClusterSchemaTree schemaTree,
+      PartialPath devicePath,
+      List<Integer> indexOfTargetMeasurements,
+      String[] measurements,
+      Function<Integer, TSDataType> getDataType,
+      TSEncoding[] encodings,
+      CompressionType[] compressionTypes,
+      boolean isAligned) {
+    // check whether there is template should be activated
+    Pair<Template, PartialPath> templateInfo = 
templateManager.checkTemplateSetInfo(devicePath);
+    if (templateInfo != null) {
+      Template template = templateInfo.left;
+      boolean shouldActivateTemplate = false;
+      for (int index : indexOfTargetMeasurements) {
+        if (template.hasSchema(measurements[index])) {
+          shouldActivateTemplate = true;
+          break;
+        }
+      }
+
+      if (shouldActivateTemplate) {
+        internalActivateTemplate(devicePath);
+        List<Integer> recheckedIndexOfMissingMeasurements = new ArrayList<>();
+        for (int i = 0; i < indexOfTargetMeasurements.size(); i++) {
+          if (!template.hasSchema(measurements[i])) {
+            
recheckedIndexOfMissingMeasurements.add(indexOfTargetMeasurements.get(i));
+          }
+        }
+        indexOfTargetMeasurements = recheckedIndexOfMissingMeasurements;
+        for (Map.Entry<String, IMeasurementSchema> entry : 
template.getSchemaMap().entrySet()) {
+          schemaTree.appendSingleMeasurement(
+              devicePath.concatNode(entry.getKey()),
+              (MeasurementSchema) entry.getValue(),
+              null,
+              null,
+              template.isDirectAligned());
+        }
+
+        if (indexOfTargetMeasurements.isEmpty()) {
+          return;
+        }
+      }
+    }
+
+    // auto create the rest missing timeseries
+    List<String> missingMeasurements = new 
ArrayList<>(indexOfTargetMeasurements.size());
+    List<TSDataType> dataTypesOfMissingMeasurement =
+        new ArrayList<>(indexOfTargetMeasurements.size());
+    List<TSEncoding> encodingsOfMissingMeasurement =
+        new ArrayList<>(indexOfTargetMeasurements.size());
+    List<CompressionType> compressionTypesOfMissingMeasurement =
+        new ArrayList<>(indexOfTargetMeasurements.size());
+    indexOfTargetMeasurements.forEach(
+        index -> {
+          TSDataType tsDataType = getDataType.apply(index);
+          // tsDataType == null means insert null value to a non-exist series
+          // should skip creating them
+          if (tsDataType != null) {
+            missingMeasurements.add(measurements[index]);
+            dataTypesOfMissingMeasurement.add(tsDataType);
+            encodingsOfMissingMeasurement.add(
+                encodings == null ? getDefaultEncoding(tsDataType) : 
encodings[index]);
+            compressionTypesOfMissingMeasurement.add(
+                compressionTypes == null
+                    ? 
TSFileDescriptor.getInstance().getConfig().getCompressor()
+                    : compressionTypes[index]);
+          }
+        });
+
+    if (!missingMeasurements.isEmpty()) {
+      schemaTree.mergeSchemaTree(
+          internalCreateTimeseries(
+              devicePath,
+              missingMeasurements,
+              dataTypesOfMissingMeasurement,
+              encodingsOfMissingMeasurement,
+              compressionTypesOfMissingMeasurement,
+              isAligned));
+    }
+  }
+
+  // try to create the target timeseries and return schemaTree involving 
successfully created
+  // timeseries and existing timeseries
+  private ClusterSchemaTree internalCreateTimeseries(
+      PartialPath devicePath,
+      List<String> measurements,
+      List<TSDataType> tsDataTypes,
+      List<TSEncoding> encodings,
+      List<CompressionType> compressors,
+      boolean isAligned) {
+    List<MeasurementPath> measurementPathList =
+        executeInternalCreateTimeseriesStatement(
+            new InternalCreateTimeSeriesStatement(
+                devicePath, measurements, tsDataTypes, encodings, compressors, 
isAligned));
+
+    Set<Integer> alreadyExistingMeasurementIndexSet =
+        measurementPathList.stream()
+            .map(o -> measurements.indexOf(o.getMeasurement()))
+            .collect(Collectors.toSet());
+
+    ClusterSchemaTree schemaTree = new ClusterSchemaTree();
+    schemaTree.appendMeasurementPaths(measurementPathList);
+
+    for (int i = 0, size = measurements.size(); i < size; i++) {
+      if (alreadyExistingMeasurementIndexSet.contains(i)) {
+        continue;
+      }
+
+      schemaTree.appendSingleMeasurement(
+          devicePath.concatNode(measurements.get(i)),
+          new MeasurementSchema(
+              measurements.get(i), tsDataTypes.get(i), encodings.get(i), 
compressors.get(i)),
+          null,
+          null,
+          isAligned);
+    }
+
+    return schemaTree;
+  }
+
+  // auto create timeseries and return the existing timeseries info
+  private List<MeasurementPath> executeInternalCreateTimeseriesStatement(
+      InternalCreateTimeSeriesStatement statement) {
+
+    ExecutionResult executionResult = statementExecutor.apply(statement);
+
+    int statusCode = executionResult.status.getCode();
+    if (statusCode == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+      return Collections.emptyList();
+    }
+
+    if (statusCode != TSStatusCode.MULTIPLE_ERROR.getStatusCode()) {
+      throw new RuntimeException(
+          new IoTDBException(executionResult.status.getMessage(), statusCode));
+    }
+
+    Set<String> failedCreationSet = new HashSet<>();
+    List<MeasurementPath> alreadyExistingMeasurements = new ArrayList<>();
+    for (TSStatus subStatus : executionResult.status.subStatus) {
+      if (subStatus.code == 
TSStatusCode.TIMESERIES_ALREADY_EXIST.getStatusCode()) {
+        alreadyExistingMeasurements.add(
+            MeasurementPath.parseDataFromString(subStatus.getMessage()));
+      } else {
+        failedCreationSet.add(subStatus.message);
+      }
+    }
+
+    if (!failedCreationSet.isEmpty()) {
+      throw new SemanticException(new MetadataException(String.join("; ", 
failedCreationSet)));
+    }
+
+    return alreadyExistingMeasurements;
+  }
+
+  private void internalActivateTemplate(PartialPath devicePath) {
+    ExecutionResult executionResult =
+        statementExecutor.apply(new ActivateTemplateStatement(devicePath));
+    TSStatus status = executionResult.status;
+    if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()
+        && status.getCode() != 
TSStatusCode.TEMPLATE_IS_IN_USE.getStatusCode()) {
+      throw new RuntimeException(new IoTDBException(status.getMessage(), 
status.getCode()));
+    }
+  }
+}
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/ClusterSchemaFetchExecutor.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/ClusterSchemaFetchExecutor.java
new file mode 100644
index 0000000000..3f8e609604
--- /dev/null
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/ClusterSchemaFetchExecutor.java
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.plan.analyze.schema;
+
+import org.apache.iotdb.commons.exception.IoTDBException;
+import org.apache.iotdb.commons.exception.MetadataException;
+import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.commons.path.PathPatternTree;
+import org.apache.iotdb.db.metadata.template.ITemplateManager;
+import org.apache.iotdb.db.metadata.template.Template;
+import org.apache.iotdb.db.mpp.common.schematree.ClusterSchemaTree;
+import org.apache.iotdb.db.mpp.plan.Coordinator;
+import org.apache.iotdb.db.mpp.plan.execution.ExecutionResult;
+import org.apache.iotdb.db.mpp.plan.statement.Statement;
+import org.apache.iotdb.db.mpp.plan.statement.internal.SchemaFetchStatement;
+import org.apache.iotdb.db.utils.SetThreadName;
+import org.apache.iotdb.rpc.TSStatusCode;
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+import org.apache.iotdb.tsfile.read.common.block.column.Column;
+import org.apache.iotdb.tsfile.utils.Binary;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.function.BiFunction;
+import java.util.function.Consumer;
+import java.util.function.Supplier;
+
+class ClusterSchemaFetchExecutor {
+
+  private final Coordinator coordinator;
+  private final ITemplateManager templateManager;
+  private final Supplier<Long> queryIdProvider;
+  private final BiFunction<Long, Statement, ExecutionResult> statementExecutor;
+  private final Consumer<ClusterSchemaTree> schemaCacheUpdater;
+
+  ClusterSchemaFetchExecutor(
+      Coordinator coordinator,
+      ITemplateManager templateManager,
+      Supplier<Long> queryIdProvider,
+      BiFunction<Long, Statement, ExecutionResult> statementExecutor,
+      Consumer<ClusterSchemaTree> schemaCacheUpdater) {
+    this.coordinator = coordinator;
+    this.templateManager = templateManager;
+    this.queryIdProvider = queryIdProvider;
+    this.statementExecutor = statementExecutor;
+    this.schemaCacheUpdater = schemaCacheUpdater;
+  }
+
+  /**
+   * This method is used for scenarios that patternTree may have wildcard or 
there's no need to
+   * cache the result.
+   */
+  ClusterSchemaTree fetchSchemaOfFuzzyMatch(PathPatternTree patternTree, 
boolean withTags) {
+    Map<Integer, Template> templateMap = new HashMap<>();
+    List<PartialPath> pathPatternList = patternTree.getAllPathPatterns();
+    for (PartialPath pattern : pathPatternList) {
+      templateMap.putAll(templateManager.checkAllRelatedTemplate(pattern));
+    }
+    return executeSchemaFetchQuery(new SchemaFetchStatement(patternTree, 
templateMap, withTags));
+  }
+
+  /**
+   * This method is used for scenarios that patternTree has no wildcard and 
the result should be
+   * cached.
+   *
+   * @param fullPathList all the fullPath without wildcard split from 
rawPatternTree
+   * @param rawPatternTree the pattern tree consisting of the fullPathList
+   * @return fetched schema
+   */
+  ClusterSchemaTree fetchSchemaOfPreciseMatch(
+      List<PartialPath> fullPathList, PathPatternTree rawPatternTree) {
+    ClusterSchemaTree schemaTree =
+        executeSchemaFetchQuery(
+            new SchemaFetchStatement(rawPatternTree, 
analyzeTemplate(fullPathList), false));
+    if (!schemaTree.isEmpty()) {
+      schemaCacheUpdater.accept(schemaTree);
+    }
+    return schemaTree;
+  }
+
+  ClusterSchemaTree fetchSchemaOfOneDevice(
+      PartialPath devicePath, String[] measurements, List<Integer> 
indexOfTargetMeasurements) {
+    PathPatternTree patternTree = new PathPatternTree();
+    for (int index : indexOfTargetMeasurements) {
+      patternTree.appendFullPath(devicePath, measurements[index]);
+    }
+    patternTree.constructTree();
+    return fetchSchemaAndCacheResult(patternTree);
+  }
+
+  ClusterSchemaTree fetchSchemaOfMultiDevices(
+      List<PartialPath> devicePathList,
+      List<String[]> measurementsList,
+      List<List<Integer>> indexOfTargetMeasurementsList) {
+    PathPatternTree patternTree = new PathPatternTree();
+    for (int i = 0; i < devicePathList.size(); i++) {
+      for (int index : indexOfTargetMeasurementsList.get(i)) {
+        patternTree.appendFullPath(devicePathList.get(i), 
measurementsList.get(i)[index]);
+      }
+    }
+    patternTree.constructTree();
+    return fetchSchemaAndCacheResult(patternTree);
+  }
+
+  private ClusterSchemaTree fetchSchemaAndCacheResult(PathPatternTree 
patternTree) {
+    ClusterSchemaTree schemaTree =
+        executeSchemaFetchQuery(
+            new SchemaFetchStatement(
+                patternTree, 
analyzeTemplate(patternTree.getAllPathPatterns()), false));
+    if (!schemaTree.isEmpty()) {
+      schemaCacheUpdater.accept(schemaTree);
+    }
+    return schemaTree;
+  }
+
+  private Map<Integer, Template> analyzeTemplate(List<PartialPath> 
pathPatternList) {
+    Map<Integer, Template> templateMap = new HashMap<>();
+    for (PartialPath pattern : pathPatternList) {
+      templateMap.putAll(templateManager.checkAllRelatedTemplate(pattern));
+    }
+    return templateMap;
+  }
+
+  private ClusterSchemaTree executeSchemaFetchQuery(SchemaFetchStatement 
schemaFetchStatement) {
+    long queryId = queryIdProvider.get();
+    try {
+      ExecutionResult executionResult = statementExecutor.apply(queryId, 
schemaFetchStatement);
+      if (executionResult.status.getCode() != 
TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+        throw new RuntimeException(
+            String.format(
+                "cannot fetch schema, status is: %s, msg is: %s",
+                executionResult.status.getCode(), 
executionResult.status.getMessage()));
+      }
+      try (SetThreadName threadName = new 
SetThreadName(executionResult.queryId.getId())) {
+        ClusterSchemaTree result = new ClusterSchemaTree();
+        Set<String> databaseSet = new HashSet<>();
+        while (coordinator.getQueryExecution(queryId).hasNextResult()) {
+          // The query will be transited to FINISHED when invoking 
getBatchResult() at the last time
+          // So we don't need to clean up it manually
+          Optional<TsBlock> tsBlock;
+          try {
+            tsBlock = coordinator.getQueryExecution(queryId).getBatchResult();
+          } catch (IoTDBException e) {
+            throw new RuntimeException("Fetch Schema failed. ", e);
+          }
+          if (!tsBlock.isPresent() || tsBlock.get().isEmpty()) {
+            break;
+          }
+          Column column = tsBlock.get().getColumn(0);
+          for (int i = 0; i < column.getPositionCount(); i++) {
+            parseFetchedData(column.getBinary(i), result, databaseSet);
+          }
+        }
+        result.setDatabases(databaseSet);
+        return result;
+      }
+    } finally {
+      coordinator.cleanupQueryExecution(queryId);
+    }
+  }
+
+  private void parseFetchedData(
+      Binary data, ClusterSchemaTree resultSchemaTree, Set<String> 
databaseSet) {
+    InputStream inputStream = new ByteArrayInputStream(data.getValues());
+    try {
+      byte type = ReadWriteIOUtils.readByte(inputStream);
+      if (type == 0) {
+        int size = ReadWriteIOUtils.readInt(inputStream);
+        for (int i = 0; i < size; i++) {
+          databaseSet.add(ReadWriteIOUtils.readString(inputStream));
+        }
+      } else if (type == 1) {
+        
resultSchemaTree.mergeSchemaTree(ClusterSchemaTree.deserialize(inputStream));
+      } else {
+        throw new RuntimeException(
+            new MetadataException("Failed to fetch schema because of 
unrecognized data"));
+      }
+    } catch (IOException e) {
+      // Totally memory operation. This case won't happen.
+    }
+  }
+}
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/ClusterSchemaFetcher.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/ClusterSchemaFetcher.java
new file mode 100644
index 0000000000..fb835ff76f
--- /dev/null
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/ClusterSchemaFetcher.java
@@ -0,0 +1,354 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.mpp.plan.analyze.schema;
+
+import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.commons.path.PathPatternTree;
+import org.apache.iotdb.db.conf.IoTDBConfig;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.metadata.cache.DataNodeSchemaCache;
+import org.apache.iotdb.db.metadata.template.ClusterTemplateManager;
+import org.apache.iotdb.db.metadata.template.ITemplateManager;
+import org.apache.iotdb.db.metadata.template.Template;
+import org.apache.iotdb.db.mpp.common.schematree.ClusterSchemaTree;
+import org.apache.iotdb.db.mpp.common.schematree.DeviceSchemaInfo;
+import org.apache.iotdb.db.mpp.common.schematree.ISchemaTree;
+import org.apache.iotdb.db.mpp.plan.Coordinator;
+import org.apache.iotdb.db.mpp.plan.analyze.ClusterPartitionFetcher;
+import org.apache.iotdb.db.query.control.SessionManager;
+import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
+import org.apache.iotdb.tsfile.utils.Pair;
+import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+public class ClusterSchemaFetcher implements ISchemaFetcher {
+
+  private final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
+
+  private final Coordinator coordinator = Coordinator.getInstance();
+  private final DataNodeSchemaCache schemaCache = 
DataNodeSchemaCache.getInstance();
+  private final ITemplateManager templateManager = 
ClusterTemplateManager.getInstance();
+
+  private final AutoCreateSchemaExecutor autoCreateSchemaExecutor =
+      new AutoCreateSchemaExecutor(
+          templateManager,
+          statement -> {
+            long queryId = SessionManager.getInstance().requestQueryId();
+            return coordinator.execute(
+                statement,
+                queryId,
+                null,
+                "",
+                ClusterPartitionFetcher.getInstance(),
+                this,
+                config.getQueryTimeoutThreshold());
+          });
+  private final ClusterSchemaFetchExecutor clusterSchemaFetchExecutor =
+      new ClusterSchemaFetchExecutor(
+          coordinator,
+          templateManager,
+          () -> SessionManager.getInstance().requestQueryId(),
+          (queryId, statement) ->
+              coordinator.execute(
+                  statement,
+                  queryId,
+                  null,
+                  "",
+                  ClusterPartitionFetcher.getInstance(),
+                  this,
+                  config.getQueryTimeoutThreshold()),
+          schemaCache::put);
+
+  private static final class ClusterSchemaFetcherHolder {
+    private static final ClusterSchemaFetcher INSTANCE = new 
ClusterSchemaFetcher();
+
+    private ClusterSchemaFetcherHolder() {}
+  }
+
+  public static ClusterSchemaFetcher getInstance() {
+    return ClusterSchemaFetcherHolder.INSTANCE;
+  }
+
+  private ClusterSchemaFetcher() {}
+
+  @Override
+  public ClusterSchemaTree fetchSchema(PathPatternTree patternTree) {
+    patternTree.constructTree();
+    List<PartialPath> pathPatternList = patternTree.getAllPathPatterns();
+    List<PartialPath> fullPathList = new ArrayList<>();
+    for (PartialPath pattern : pathPatternList) {
+      if (!pattern.hasWildcard()) {
+        fullPathList.add(pattern);
+      }
+    }
+
+    if (fullPathList.size() < pathPatternList.size()) {
+      return clusterSchemaFetchExecutor.fetchSchemaOfFuzzyMatch(patternTree, 
false);
+    }
+
+    // The schema cache R/W and fetch operation must be locked together thus 
the cache clean
+    // operation executed by delete timeseries will be effective.
+    schemaCache.takeReadLock();
+    try {
+      ClusterSchemaTree schemaTree;
+      if (fullPathList.size() == pathPatternList.size()) {
+        boolean isAllCached = true;
+        schemaTree = new ClusterSchemaTree();
+        ClusterSchemaTree cachedSchema;
+        Set<String> storageGroupSet = new HashSet<>();
+        for (PartialPath fullPath : fullPathList) {
+          cachedSchema = schemaCache.get(fullPath);
+          if (cachedSchema.isEmpty()) {
+            isAllCached = false;
+            break;
+          } else {
+            schemaTree.mergeSchemaTree(cachedSchema);
+            storageGroupSet.addAll(cachedSchema.getDatabases());
+          }
+        }
+        if (isAllCached) {
+          schemaTree.setDatabases(storageGroupSet);
+          return schemaTree;
+        }
+      }
+
+      return 
clusterSchemaFetchExecutor.fetchSchemaOfPreciseMatch(fullPathList, patternTree);
+    } finally {
+      schemaCache.releaseReadLock();
+    }
+  }
+
+  @Override
+  public ClusterSchemaTree fetchSchemaWithTags(PathPatternTree patternTree) {
+    patternTree.constructTree();
+    return clusterSchemaFetchExecutor.fetchSchemaOfFuzzyMatch(patternTree, 
true);
+  }
+
+  @Override
+  public ISchemaTree fetchSchemaWithAutoCreate(
+      PartialPath devicePath,
+      String[] measurements,
+      Function<Integer, TSDataType> getDataType,
+      boolean isAligned) {
+    // The schema cache R/W and fetch operation must be locked together thus 
the cache clean
+    // operation executed by delete timeseries will be effective.
+    schemaCache.takeReadLock();
+    try {
+      ClusterSchemaTree schemaTree = schemaCache.get(devicePath, measurements);
+      List<Integer> indexOfMissingMeasurements =
+          checkMissingMeasurements(schemaTree, devicePath, measurements);
+
+      // all schema can be taken from cache
+      if (indexOfMissingMeasurements.isEmpty()) {
+        return schemaTree;
+      }
+
+      // try fetch the missing schema from remote and cache fetched schema
+      ClusterSchemaTree remoteSchemaTree =
+          clusterSchemaFetchExecutor.fetchSchemaOfOneDevice(
+              devicePath, measurements, indexOfMissingMeasurements);
+      if (!remoteSchemaTree.isEmpty()) {
+        schemaTree.mergeSchemaTree(remoteSchemaTree);
+      }
+
+      if (!config.isAutoCreateSchemaEnabled()) {
+        return schemaTree;
+      }
+
+      // auto create the still missing schema and merge them into schemaTree
+      checkAndAutoCreateMissingMeasurements(
+          schemaTree,
+          devicePath,
+          indexOfMissingMeasurements,
+          measurements,
+          getDataType,
+          null,
+          null,
+          isAligned);
+
+      return schemaTree;
+    } finally {
+      schemaCache.releaseReadLock();
+    }
+  }
+
+  @Override
+  public ISchemaTree fetchSchemaListWithAutoCreate(
+      List<PartialPath> devicePathList,
+      List<String[]> measurementsList,
+      List<TSDataType[]> tsDataTypesList,
+      List<Boolean> isAlignedList) {
+    return fetchSchemaListWithAutoCreate(
+        devicePathList, measurementsList, tsDataTypesList, null, null, 
isAlignedList);
+  }
+
+  @Override
+  public ISchemaTree fetchSchemaListWithAutoCreate(
+      List<PartialPath> devicePathList,
+      List<String[]> measurementsList,
+      List<TSDataType[]> tsDataTypesList,
+      List<TSEncoding[]> encodingsList,
+      List<CompressionType[]> compressionTypesList,
+      List<Boolean> isAlignedList) {
+    // The schema cache R/W and fetch operation must be locked together thus 
the cache clean
+    // operation executed by delete timeseries will be effective.
+    schemaCache.takeReadLock();
+    try {
+      ClusterSchemaTree schemaTree = new ClusterSchemaTree();
+      List<List<Integer>> indexOfMissingMeasurementsList = new 
ArrayList<>(devicePathList.size());
+      boolean hasMissingMeasurement = false;
+      for (int i = 0; i < devicePathList.size(); i++) {
+        schemaTree.mergeSchemaTree(schemaCache.get(devicePathList.get(i), 
measurementsList.get(i)));
+        List<Integer> indexOfMissingMeasurements =
+            checkMissingMeasurements(schemaTree, devicePathList.get(i), 
measurementsList.get(i));
+        indexOfMissingMeasurementsList.add(indexOfMissingMeasurements);
+        if (!indexOfMissingMeasurements.isEmpty()) {
+          hasMissingMeasurement = true;
+        }
+      }
+
+      // all schema can be taken from cache
+      if (!hasMissingMeasurement) {
+        return schemaTree;
+      }
+
+      // try fetch the missing schema from remote and cache fetched schema
+      ClusterSchemaTree remoteSchemaTree =
+          clusterSchemaFetchExecutor.fetchSchemaOfMultiDevices(
+              devicePathList, measurementsList, 
indexOfMissingMeasurementsList);
+      if (!remoteSchemaTree.isEmpty()) {
+        schemaTree.mergeSchemaTree(remoteSchemaTree);
+      }
+
+      if (!config.isAutoCreateSchemaEnabled()) {
+        return schemaTree;
+      }
+
+      // auto create the still missing schema and merge them into schemaTree
+      for (int i = 0; i < devicePathList.size(); i++) {
+        int finalI = i;
+        checkAndAutoCreateMissingMeasurements(
+            schemaTree,
+            devicePathList.get(i),
+            indexOfMissingMeasurementsList.get(i),
+            measurementsList.get(i),
+            index -> tsDataTypesList.get(finalI)[index],
+            encodingsList == null ? null : encodingsList.get(i),
+            compressionTypesList == null ? null : compressionTypesList.get(i),
+            isAlignedList.get(i));
+      }
+      return schemaTree;
+    } finally {
+      schemaCache.releaseReadLock();
+    }
+  }
+
+  @Override
+  public Pair<Template, PartialPath> checkTemplateSetInfo(PartialPath path) {
+    return templateManager.checkTemplateSetInfo(path);
+  }
+
+  @Override
+  public Map<Integer, Template> checkAllRelatedTemplate(PartialPath 
pathPattern) {
+    return templateManager.checkAllRelatedTemplate(pathPattern);
+  }
+
+  @Override
+  public Pair<Template, List<PartialPath>> getAllPathsSetTemplate(String 
templateName) {
+    return templateManager.getAllPathsSetTemplate(templateName);
+  }
+
+  // check which measurements are missing and auto create the missing 
measurements and merge them
+  // into given schemaTree
+  private void checkAndAutoCreateMissingMeasurements(
+      ClusterSchemaTree schemaTree,
+      PartialPath devicePath,
+      List<Integer> indexOfMissingMeasurements,
+      String[] measurements,
+      Function<Integer, TSDataType> getDataType,
+      TSEncoding[] encodings,
+      CompressionType[] compressionTypes,
+      boolean isAligned) {
+    // check missing measurements
+    DeviceSchemaInfo deviceSchemaInfo =
+        schemaTree.searchDeviceSchemaInfo(
+            devicePath,
+            indexOfMissingMeasurements.stream()
+                .map(index -> measurements[index])
+                .collect(Collectors.toList()));
+    if (deviceSchemaInfo != null) {
+      List<MeasurementSchema> schemaList = 
deviceSchemaInfo.getMeasurementSchemaList();
+      int removedCount = 0;
+      for (int i = 0, size = schemaList.size(); i < size; i++) {
+        if (schemaList.get(i) != null) {
+          indexOfMissingMeasurements.remove(i - removedCount);
+          removedCount++;
+        }
+      }
+    }
+    if (indexOfMissingMeasurements.isEmpty()) {
+      return;
+    }
+
+    autoCreateSchemaExecutor.autoCreateMissingMeasurements(
+        schemaTree,
+        devicePath,
+        indexOfMissingMeasurements,
+        measurements,
+        getDataType,
+        encodings,
+        compressionTypes,
+        isAligned);
+  }
+
+  private List<Integer> checkMissingMeasurements(
+      ISchemaTree schemaTree, PartialPath devicePath, String[] measurements) {
+    DeviceSchemaInfo deviceSchemaInfo =
+        schemaTree.searchDeviceSchemaInfo(devicePath, 
Arrays.asList(measurements));
+    if (deviceSchemaInfo == null) {
+      return IntStream.range(0, 
measurements.length).boxed().collect(Collectors.toList());
+    }
+
+    List<Integer> indexOfMissingMeasurements = new ArrayList<>();
+    List<MeasurementSchema> schemaList = 
deviceSchemaInfo.getMeasurementSchemaList();
+    for (int i = 0; i < measurements.length; i++) {
+      if (schemaList.get(i) == null) {
+        indexOfMissingMeasurements.add(i);
+      }
+    }
+
+    return indexOfMissingMeasurements;
+  }
+
+  @Override
+  public void invalidAllCache() {
+    DataNodeSchemaCache.getInstance().cleanUp();
+  }
+}
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/ISchemaFetcher.java 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/ISchemaFetcher.java
similarity index 97%
rename from 
server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/ISchemaFetcher.java
rename to 
server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/ISchemaFetcher.java
index 5ec75fe1da..37516321c0 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/ISchemaFetcher.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/ISchemaFetcher.java
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.mpp.plan.analyze;
+package org.apache.iotdb.db.mpp.plan.analyze.schema;
 
 import org.apache.iotdb.commons.path.PartialPath;
 import org.apache.iotdb.commons.path.PathPatternTree;
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/SchemaValidator.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/SchemaValidator.java
similarity index 98%
rename from 
server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/SchemaValidator.java
rename to 
server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/SchemaValidator.java
index cb9b8b8899..3882d6e34d 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/SchemaValidator.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/SchemaValidator.java
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.mpp.plan.analyze;
+package org.apache.iotdb.db.mpp.plan.analyze.schema;
 
 import org.apache.iotdb.commons.exception.MetadataException;
 import org.apache.iotdb.commons.path.PartialPath;
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java
index 99a7f606c7..2c87021c56 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java
@@ -36,8 +36,8 @@ import 
org.apache.iotdb.db.mpp.execution.exchange.MPPDataExchangeService;
 import org.apache.iotdb.db.mpp.plan.analyze.Analysis;
 import org.apache.iotdb.db.mpp.plan.analyze.Analyzer;
 import org.apache.iotdb.db.mpp.plan.analyze.IPartitionFetcher;
-import org.apache.iotdb.db.mpp.plan.analyze.ISchemaFetcher;
 import org.apache.iotdb.db.mpp.plan.analyze.QueryType;
+import org.apache.iotdb.db.mpp.plan.analyze.schema.ISchemaFetcher;
 import org.apache.iotdb.db.mpp.plan.execution.memory.MemorySourceHandle;
 import org.apache.iotdb.db.mpp.plan.execution.memory.StatementMemorySource;
 import 
org.apache.iotdb.db.mpp.plan.execution.memory.StatementMemorySourceContext;
diff --git 
a/server/src/main/java/org/apache/iotdb/db/protocol/mqtt/MPPPublishHandler.java 
b/server/src/main/java/org/apache/iotdb/db/protocol/mqtt/MPPPublishHandler.java
index a55522c185..53d5241ef2 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/protocol/mqtt/MPPPublishHandler.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/protocol/mqtt/MPPPublishHandler.java
@@ -25,9 +25,9 @@ import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.mpp.plan.Coordinator;
 import org.apache.iotdb.db.mpp.plan.analyze.ClusterPartitionFetcher;
-import org.apache.iotdb.db.mpp.plan.analyze.ClusterSchemaFetcher;
 import org.apache.iotdb.db.mpp.plan.analyze.IPartitionFetcher;
-import org.apache.iotdb.db.mpp.plan.analyze.ISchemaFetcher;
+import org.apache.iotdb.db.mpp.plan.analyze.schema.ClusterSchemaFetcher;
+import org.apache.iotdb.db.mpp.plan.analyze.schema.ISchemaFetcher;
 import org.apache.iotdb.db.mpp.plan.execution.ExecutionResult;
 import org.apache.iotdb.db.mpp.plan.statement.crud.InsertRowStatement;
 import org.apache.iotdb.db.query.control.SessionManager;
diff --git 
a/server/src/main/java/org/apache/iotdb/db/protocol/rest/impl/GrafanaApiServiceImpl.java
 
b/server/src/main/java/org/apache/iotdb/db/protocol/rest/impl/GrafanaApiServiceImpl.java
index c4575725bc..f5e1c31de4 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/protocol/rest/impl/GrafanaApiServiceImpl.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/protocol/rest/impl/GrafanaApiServiceImpl.java
@@ -22,9 +22,9 @@ import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.mpp.plan.Coordinator;
 import org.apache.iotdb.db.mpp.plan.analyze.ClusterPartitionFetcher;
-import org.apache.iotdb.db.mpp.plan.analyze.ClusterSchemaFetcher;
 import org.apache.iotdb.db.mpp.plan.analyze.IPartitionFetcher;
-import org.apache.iotdb.db.mpp.plan.analyze.ISchemaFetcher;
+import org.apache.iotdb.db.mpp.plan.analyze.schema.ClusterSchemaFetcher;
+import org.apache.iotdb.db.mpp.plan.analyze.schema.ISchemaFetcher;
 import org.apache.iotdb.db.mpp.plan.execution.ExecutionResult;
 import org.apache.iotdb.db.mpp.plan.execution.IQueryExecution;
 import org.apache.iotdb.db.mpp.plan.parser.StatementGenerator;
diff --git 
a/server/src/main/java/org/apache/iotdb/db/protocol/rest/impl/RestApiServiceImpl.java
 
b/server/src/main/java/org/apache/iotdb/db/protocol/rest/impl/RestApiServiceImpl.java
index 0eeea0af1d..2f00f2e5dd 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/protocol/rest/impl/RestApiServiceImpl.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/protocol/rest/impl/RestApiServiceImpl.java
@@ -22,9 +22,9 @@ import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.conf.rest.IoTDBRestServiceDescriptor;
 import org.apache.iotdb.db.mpp.plan.Coordinator;
 import org.apache.iotdb.db.mpp.plan.analyze.ClusterPartitionFetcher;
-import org.apache.iotdb.db.mpp.plan.analyze.ClusterSchemaFetcher;
 import org.apache.iotdb.db.mpp.plan.analyze.IPartitionFetcher;
-import org.apache.iotdb.db.mpp.plan.analyze.ISchemaFetcher;
+import org.apache.iotdb.db.mpp.plan.analyze.schema.ClusterSchemaFetcher;
+import org.apache.iotdb.db.mpp.plan.analyze.schema.ISchemaFetcher;
 import org.apache.iotdb.db.mpp.plan.execution.ExecutionResult;
 import org.apache.iotdb.db.mpp.plan.execution.IQueryExecution;
 import org.apache.iotdb.db.mpp.plan.parser.StatementGenerator;
diff --git 
a/server/src/main/java/org/apache/iotdb/db/service/metrics/IoTDBInternalReporter.java
 
b/server/src/main/java/org/apache/iotdb/db/service/metrics/IoTDBInternalReporter.java
index 337a3ec65d..caf253c1eb 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/service/metrics/IoTDBInternalReporter.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/service/metrics/IoTDBInternalReporter.java
@@ -27,9 +27,9 @@ import 
org.apache.iotdb.db.exception.query.QueryProcessException;
 import org.apache.iotdb.db.mpp.common.SessionInfo;
 import org.apache.iotdb.db.mpp.plan.Coordinator;
 import org.apache.iotdb.db.mpp.plan.analyze.ClusterPartitionFetcher;
-import org.apache.iotdb.db.mpp.plan.analyze.ClusterSchemaFetcher;
 import org.apache.iotdb.db.mpp.plan.analyze.IPartitionFetcher;
-import org.apache.iotdb.db.mpp.plan.analyze.ISchemaFetcher;
+import org.apache.iotdb.db.mpp.plan.analyze.schema.ClusterSchemaFetcher;
+import org.apache.iotdb.db.mpp.plan.analyze.schema.ISchemaFetcher;
 import org.apache.iotdb.db.mpp.plan.execution.ExecutionResult;
 import org.apache.iotdb.db.mpp.plan.parser.StatementGenerator;
 import org.apache.iotdb.db.mpp.plan.statement.Statement;
diff --git 
a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/ClientRPCServiceImpl.java
 
b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/ClientRPCServiceImpl.java
index 49eb8abdff..29d07a462d 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/ClientRPCServiceImpl.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/ClientRPCServiceImpl.java
@@ -35,9 +35,9 @@ import 
org.apache.iotdb.db.metadata.template.TemplateQueryType;
 import org.apache.iotdb.db.mpp.common.header.DatasetHeader;
 import org.apache.iotdb.db.mpp.plan.Coordinator;
 import org.apache.iotdb.db.mpp.plan.analyze.ClusterPartitionFetcher;
-import org.apache.iotdb.db.mpp.plan.analyze.ClusterSchemaFetcher;
 import org.apache.iotdb.db.mpp.plan.analyze.IPartitionFetcher;
-import org.apache.iotdb.db.mpp.plan.analyze.ISchemaFetcher;
+import org.apache.iotdb.db.mpp.plan.analyze.schema.ClusterSchemaFetcher;
+import org.apache.iotdb.db.mpp.plan.analyze.schema.ISchemaFetcher;
 import org.apache.iotdb.db.mpp.plan.execution.ExecutionResult;
 import org.apache.iotdb.db.mpp.plan.execution.IQueryExecution;
 import org.apache.iotdb.db.mpp.plan.parser.StatementGenerator;
diff --git 
a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java
 
b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java
index 40c9ef556a..75f0bc0421 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java
@@ -76,9 +76,9 @@ import 
org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceManager;
 import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceState;
 import org.apache.iotdb.db.mpp.plan.Coordinator;
 import org.apache.iotdb.db.mpp.plan.analyze.ClusterPartitionFetcher;
-import org.apache.iotdb.db.mpp.plan.analyze.ClusterSchemaFetcher;
 import org.apache.iotdb.db.mpp.plan.analyze.IPartitionFetcher;
-import org.apache.iotdb.db.mpp.plan.analyze.ISchemaFetcher;
+import org.apache.iotdb.db.mpp.plan.analyze.schema.ClusterSchemaFetcher;
+import org.apache.iotdb.db.mpp.plan.analyze.schema.ISchemaFetcher;
 import org.apache.iotdb.db.mpp.plan.execution.ExecutionResult;
 import org.apache.iotdb.db.mpp.plan.execution.IQueryExecution;
 import org.apache.iotdb.db.mpp.plan.expression.Expression;
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/SyncService.java 
b/server/src/main/java/org/apache/iotdb/db/sync/SyncService.java
index ccc95932b5..d5df9a8a73 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/SyncService.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/SyncService.java
@@ -39,7 +39,7 @@ import org.apache.iotdb.confignode.rpc.thrift.TShowPipeInfo;
 import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.mpp.plan.analyze.IPartitionFetcher;
-import org.apache.iotdb.db.mpp.plan.analyze.ISchemaFetcher;
+import org.apache.iotdb.db.mpp.plan.analyze.schema.ISchemaFetcher;
 import org.apache.iotdb.db.mpp.plan.statement.sys.sync.CreatePipeSinkStatement;
 import org.apache.iotdb.db.qp.utils.DateTimeUtils;
 import org.apache.iotdb.db.sync.common.ClusterSyncInfoFetcher;
diff --git 
a/server/src/main/java/org/apache/iotdb/db/sync/pipedata/load/ILoader.java 
b/server/src/main/java/org/apache/iotdb/db/sync/pipedata/load/ILoader.java
index 5936207a55..05ffb9c7df 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/pipedata/load/ILoader.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/pipedata/load/ILoader.java
@@ -20,9 +20,9 @@ package org.apache.iotdb.db.sync.pipedata.load;
 
 import org.apache.iotdb.commons.exception.sync.PipeDataLoadException;
 import org.apache.iotdb.db.mpp.plan.analyze.ClusterPartitionFetcher;
-import org.apache.iotdb.db.mpp.plan.analyze.ClusterSchemaFetcher;
 import org.apache.iotdb.db.mpp.plan.analyze.IPartitionFetcher;
-import org.apache.iotdb.db.mpp.plan.analyze.ISchemaFetcher;
+import org.apache.iotdb.db.mpp.plan.analyze.schema.ClusterSchemaFetcher;
+import org.apache.iotdb.db.mpp.plan.analyze.schema.ISchemaFetcher;
 
 /**
  * This interface is used to load files, including tsFile, syncTask, schema, 
modsFile and
diff --git 
a/server/src/main/java/org/apache/iotdb/db/sync/transport/server/ReceiverManager.java
 
b/server/src/main/java/org/apache/iotdb/db/sync/transport/server/ReceiverManager.java
index d0c348fda1..a8ffdab2d3 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/sync/transport/server/ReceiverManager.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/sync/transport/server/ReceiverManager.java
@@ -30,7 +30,7 @@ import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.mpp.plan.Coordinator;
 import org.apache.iotdb.db.mpp.plan.analyze.IPartitionFetcher;
-import org.apache.iotdb.db.mpp.plan.analyze.ISchemaFetcher;
+import org.apache.iotdb.db.mpp.plan.analyze.schema.ISchemaFetcher;
 import org.apache.iotdb.db.mpp.plan.execution.ExecutionResult;
 import 
org.apache.iotdb.db.mpp.plan.statement.metadata.SetStorageGroupStatement;
 import org.apache.iotdb.db.query.control.SessionManager;
diff --git 
a/server/src/main/java/org/apache/iotdb/db/wal/recover/file/TsFilePlanRedoer.java
 
b/server/src/main/java/org/apache/iotdb/db/wal/recover/file/TsFilePlanRedoer.java
index f468e09d9b..89430bc91f 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/wal/recover/file/TsFilePlanRedoer.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/wal/recover/file/TsFilePlanRedoer.java
@@ -27,7 +27,7 @@ import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
 import org.apache.iotdb.db.exception.WriteProcessException;
 import org.apache.iotdb.db.metadata.idtable.IDTable;
 import org.apache.iotdb.db.metadata.idtable.entry.DeviceIDFactory;
-import org.apache.iotdb.db.mpp.plan.analyze.SchemaValidator;
+import org.apache.iotdb.db.mpp.plan.analyze.schema.SchemaValidator;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.DeleteDataNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertRowNode;
diff --git 
a/server/src/test/java/org/apache/iotdb/db/mpp/plan/analyze/FakeSchemaFetcherImpl.java
 
b/server/src/test/java/org/apache/iotdb/db/mpp/plan/analyze/FakeSchemaFetcherImpl.java
index 61951feee4..a65b83d03a 100644
--- 
a/server/src/test/java/org/apache/iotdb/db/mpp/plan/analyze/FakeSchemaFetcherImpl.java
+++ 
b/server/src/test/java/org/apache/iotdb/db/mpp/plan/analyze/FakeSchemaFetcherImpl.java
@@ -28,6 +28,7 @@ import 
org.apache.iotdb.db.mpp.common.schematree.node.SchemaEntityNode;
 import org.apache.iotdb.db.mpp.common.schematree.node.SchemaInternalNode;
 import org.apache.iotdb.db.mpp.common.schematree.node.SchemaMeasurementNode;
 import org.apache.iotdb.db.mpp.common.schematree.node.SchemaNode;
+import org.apache.iotdb.db.mpp.plan.analyze.schema.ISchemaFetcher;
 import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
diff --git 
a/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/distribution/Util.java 
b/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/distribution/Util.java
index a4a3c6c580..7c908b2f4d 100644
--- 
a/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/distribution/Util.java
+++ 
b/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/distribution/Util.java
@@ -46,7 +46,7 @@ import 
org.apache.iotdb.db.mpp.common.schematree.node.SchemaNode;
 import org.apache.iotdb.db.mpp.plan.analyze.Analysis;
 import org.apache.iotdb.db.mpp.plan.analyze.Analyzer;
 import org.apache.iotdb.db.mpp.plan.analyze.IPartitionFetcher;
-import org.apache.iotdb.db.mpp.plan.analyze.ISchemaFetcher;
+import org.apache.iotdb.db.mpp.plan.analyze.schema.ISchemaFetcher;
 import org.apache.iotdb.db.mpp.plan.expression.Expression;
 import org.apache.iotdb.db.mpp.plan.expression.leaf.TimeSeriesOperand;
 import org.apache.iotdb.db.mpp.plan.parser.StatementGenerator;

Reply via email to