This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a commit to branch TableModelIngestion
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit ac19d1d238c7d719ee62a4aa95e6e4d9d94c8d37
Merge: 7313ad4850c 180e8fb422b
Author: Tian Jiang <[email protected]>
AuthorDate: Tue Jul 9 16:57:30 2024 +0800

    Merge branch 'ty/TableModelGrammar' into TableModelIngestion
    
    # Conflicts:
    #       
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanNodeType.java
    #       
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/fetcher/TableHeaderSchemaValidator.java
    #       
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/LogicalPlanner.java
    #       
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/RelationPlanner.java
    #       
iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/AnalyzerTest.java

 .../iotdb/session/it/IoTDBSessionRelationalIT.java |    2 +-
 .../it/dual/IoTDBSubscriptionConsumerGroupIT.java  |   48 +-
 .../java/org/apache/iotdb/rpc/TSStatusCode.java    |    3 +
 .../iotdb/confignode/manager/ConfigManager.java    |   22 +
 .../apache/iotdb/confignode/manager/IManager.java  |    9 +
 .../iotdb/confignode/manager/ProcedureManager.java |  183 +-
 .../PipeConfigPhysicalPlanTSStatusVisitor.java     |   16 +-
 .../manager/schema/ClusterSchemaManager.java       |   18 +-
 .../confignode/persistence/schema/ConfigMTree.java |   38 +-
 .../impl/schema/DeleteDatabaseProcedure.java       |    4 +
 .../impl/schema/table/AddTableColumnProcedure.java |   13 +-
 .../state/schema/AddTableColumnState.java          |    4 +-
 .../thrift/ConfigNodeRPCServiceProcessor.java      |    8 +
 .../metadata/DatabaseAlreadySetException.java      |   29 +-
 ...ception.java => DatabaseConflictException.java} |   26 +-
 .../legacy/IoTDBLegacyPipeReceiverAgent.java       |  104 +-
 .../iotdb/db/protocol/client/ConfigNodeClient.java |   17 +
 .../iotdb/db/protocol/session/IClientSession.java  |    1 +
 .../impl/DataNodeInternalRPCServiceImpl.java       |    7 +
 .../source/relational/TableScanOperator.java       |    4 +-
 .../plan/analyze/ClusterPartitionFetcher.java      |   52 +-
 .../plan/analyze/LoadTsfileAnalyzer.java           |    7 +-
 .../analyze/cache/partition/PartitionCache.java    |   50 +-
 .../cache/schema/dualkeycache/IDualKeyCache.java   |   13 +
 .../schema/dualkeycache/impl/DualKeyCacheImpl.java |   37 +
 .../plan/analyze/schema/SchemaValidator.java       |    5 +-
 .../relational/AlterTableAddColumnTask.java        |    3 +
 .../TableModelStatementMemorySourceVisitor.java    |   30 +-
 .../plan/planner/LocalExecutionPlanner.java        |   13 +-
 .../plan/planner/OperatorTreeGenerator.java        |    1 +
 .../plan/planner/plan/node/PlanNode.java           |    2 +-
 .../plan/planner/plan/node/PlanNodeType.java       |    1 +
 .../planner/plan/parameter/SeriesScanOptions.java  |   11 +
 .../plan/relational/metadata/ColumnSchema.java     |    1 +
 .../plan/relational/metadata/Metadata.java         |   24 +-
 .../plan/relational/metadata/MetadataUtil.java     |    2 +-
 .../relational/metadata/TableMetadataImpl.java     |    8 +-
 .../metadata/fetcher/TableDeviceSchemaFetcher.java |  114 +-
 .../fetcher/TableDeviceSchemaValidator.java        |  122 +-
 .../fetcher/TableHeaderSchemaValidator.java        |  118 +-
 .../fetcher/cache/TableDeviceCacheEntry.java       |   25 +-
 .../metadata/fetcher/cache/TableDeviceId.java      |   12 +-
 .../fetcher/cache/TableDeviceSchemaCache.java      |   11 +-
 .../relational/metadata/fetcher/cache/TableId.java |   16 +-
 .../plan/relational/planner/LogicalPlanner.java    |   35 +-
 .../plan/relational/planner/RelationPlanner.java   |   22 +-
 .../plan/relational/planner/Symbol.java            |    5 +
 .../planner/distribute/AddExchangeNodes.java       |   94 +
 .../distribute/DistributedPlanGenerator.java       |  495 +++++
 .../planner/distribute/ExchangeNodeGenerator.java  |  219 --
 .../planner/distribute/SimplePlanRewriter.java     |   44 -
 .../distribute/TableDistributionPlanner.java       |   47 +-
 .../TableModelTypeProviderExtractor.java           |    7 +
 .../plan/relational/planner/node/CollectNode.java  |   20 +
 .../planner/node/CreateTableDeviceNode.java        |   50 +-
 .../plan/relational/planner/node/FilterNode.java   |   21 +
 .../plan/relational/planner/node/LimitNode.java    |   22 +-
 .../relational/planner/node/MergeSortNode.java     |   21 +
 .../plan/relational/planner/node/OffsetNode.java   |   20 +
 .../plan/relational/planner/node/OutputNode.java   |   43 +-
 .../plan/relational/planner/node/ProjectNode.java  |   20 +
 .../plan/relational/planner/node/SortNode.java     |   44 +-
 .../relational/planner/node/StreamSortNode.java    |   43 +-
 .../relational/planner/node/TableScanNode.java     |   68 +-
 .../plan/relational/planner/node/TopKNode.java     |   59 +-
 .../planner/optimizations/PruneUnUsedColumns.java  |   16 +-
 .../optimizations/PushPredicateIntoTableScan.java  |   95 +-
 .../RemoveRedundantIdentityProjections.java        |    2 +-
 .../planner/optimizations/SimplifyExpressions.java |    2 +-
 ...lPlanOptimizer.java => TablePlanOptimizer.java} |    2 +-
 .../attribute/DeviceAttributeStore.java            |   28 +-
 .../mtree/impl/mem/MTreeBelowSGMemoryImpl.java     |    4 +-
 .../read/resp/info/impl/ShowDevicesResult.java     |    3 +-
 .../db/schemaengine/table/DataNodeTableCache.java  |   13 +
 .../iotdb/db/schemaengine/table/ITableCache.java   |    5 +
 .../org/apache/iotdb/db/utils/CommonUtils.java     |    7 +
 .../plan/relational/analyzer/AnalyzerTest.java     |  189 +-
 .../analyzer/MockTableModelDataPartition.java      |  166 ++
 .../relational/analyzer/MockTablePartition.java    |  174 --
 .../plan/relational/analyzer/SortTest.java         | 2251 ++++++++++++++++++++
 .../plan/relational/analyzer/TestMatadata.java     |   35 +-
 .../iotdb/commons/partition/DataPartition.java     |   17 +-
 .../apache/iotdb/commons/schema/table/TsTable.java |   90 +-
 .../table/column/TsTableColumnSchemaUtil.java      |    2 +-
 .../src/main/thrift/confignode.thrift              |   10 +
 85 files changed, 4605 insertions(+), 1139 deletions(-)

diff --cc 
integration-test/src/test/java/org/apache/iotdb/session/it/IoTDBSessionRelationalIT.java
index a7439f8b1af,00000000000..e6ad59da345
mode 100644,000000..100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/session/it/IoTDBSessionRelationalIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/session/it/IoTDBSessionRelationalIT.java
@@@ -1,174 -1,0 +1,174 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing,
 + * software distributed under the License is distributed on an
 + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 + * KIND, either express or implied.  See the License for the
 + * specific language governing permissions and limitations
 + * under the License.
 + */
 +package org.apache.iotdb.session.it;
 +
 +import org.apache.iotdb.isession.ISession;
 +import org.apache.iotdb.isession.SessionDataSet;
 +import org.apache.iotdb.it.env.EnvFactory;
 +import org.apache.iotdb.it.framework.IoTDBTestRunner;
 +import org.apache.iotdb.itbase.category.ClusterIT;
 +import org.apache.iotdb.itbase.category.LocalStandaloneIT;
 +import org.apache.iotdb.rpc.IoTDBConnectionException;
 +import org.apache.iotdb.rpc.StatementExecutionException;
 +
 +import org.apache.iotdb.session.Session;
 +import org.apache.tsfile.enums.TSDataType;
 +import org.apache.tsfile.read.common.RowRecord;
 +import org.apache.tsfile.write.record.Tablet;
 +import org.apache.tsfile.write.record.Tablet.ColumnType;
 +import org.apache.tsfile.write.schema.IMeasurementSchema;
 +import org.apache.tsfile.write.schema.MeasurementSchema;
 +import org.junit.After;
 +import org.junit.Before;
 +import org.junit.Test;
 +import org.junit.experimental.categories.Category;
 +import org.junit.runner.RunWith;
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +
 +import java.util.ArrayList;
 +import java.util.Arrays;
 +import java.util.List;
 +
 +import static org.apache.iotdb.itbase.env.BaseEnv.TABLE_SQL_DIALECT;
 +import static org.junit.Assert.assertEquals;
 +
 +@RunWith(IoTDBTestRunner.class)
 +public class IoTDBSessionRelationalIT {
 +
 +  private static Logger LOGGER = 
LoggerFactory.getLogger(IoTDBSessionRelationalIT.class);
 +
 +  @Before
 +  public void setUp() throws Exception {
 +    EnvFactory.getEnv().initClusterEnvironment();
 +    try (ISession session = 
EnvFactory.getEnv().getSessionConnection(TABLE_SQL_DIALECT)) {
 +      session.executeNonQueryStatement("CREATE DATABASE db1");
 +    }
 +  }
 +
 +  @After
 +  public void tearDown() throws Exception {
 +    try (ISession session = 
EnvFactory.getEnv().getSessionConnection(TABLE_SQL_DIALECT)) {
 +      session.executeNonQueryStatement("DROP DATABASE db1");
 +    }
 +    EnvFactory.getEnv().cleanClusterEnvironment();
 +  }
 +
 +  public static void main(String[] args)
 +      throws IoTDBConnectionException, StatementExecutionException {
 +    try (ISession session =
 +        new 
Session.Builder().host("127.0.0.1").port(6667).sqlDialect(TABLE_SQL_DIALECT).build())
 {
 +      session.open();
 +      try {
 +        session.executeNonQueryStatement("DROP DATABASE db1");
 +      } catch (Exception ignored) {
 +
 +      }
 +      session.executeNonQueryStatement("CREATE DATABASE db1");
-       session.executeNonQueryStatement("USE db1");
++      session.executeNonQueryStatement("USE \"db1\"");
 +      session.executeNonQueryStatement(
 +          "CREATE TABLE table1 (id1 string id, attr1 string attribute, "
 +              + "m1 double "
 +              + "measurement)");
 +
 +      List<IMeasurementSchema> schemaList = new ArrayList<>();
 +      schemaList.add(new MeasurementSchema("id1", TSDataType.STRING));
 +      schemaList.add(new MeasurementSchema("attr1", TSDataType.STRING));
 +      schemaList.add(new MeasurementSchema("m1", TSDataType.DOUBLE));
 +      final List<ColumnType> columnTypes =
 +          Arrays.asList(ColumnType.ID, ColumnType.ATTRIBUTE, 
ColumnType.MEASUREMENT);
 +
 +      Tablet tablet = new Tablet("table1", schemaList, columnTypes, 10);
 +
 +      long timestamp = System.currentTimeMillis();
 +
 +      for (long row = 0; row < 15; row++) {
 +        int rowIndex = tablet.rowSize++;
 +        tablet.addTimestamp(rowIndex, timestamp + row);
 +        tablet.addValue("id1", rowIndex, "id:" + row);
 +        tablet.addValue("attr1", rowIndex, "attr:" + row);
 +        tablet.addValue("m1", rowIndex, row * 1.0);
 +        if (tablet.rowSize == tablet.getMaxRowNumber()) {
 +          session.insertRelationalTablet(tablet, true);
 +          tablet.reset();
 +        }
 +        timestamp++;
 +      }
 +
 +      if (tablet.rowSize != 0) {
 +        session.insertRelationalTablet(tablet);
 +        tablet.reset();
 +      }
 +
 +      SessionDataSet dataSet = session.executeQueryStatement("select count(*) 
from table1");
 +      while (dataSet.hasNext()) {
 +        RowRecord rowRecord = dataSet.next();
 +        assertEquals(15L, rowRecord.getFields().get(0).getLongV());
 +      }
 +    }
 +  }
 +
 +  @Test
 +  @Category({LocalStandaloneIT.class, ClusterIT.class})
 +  public void insertRelationalTabletTest()
 +      throws IoTDBConnectionException, StatementExecutionException {
 +    try (ISession session = 
EnvFactory.getEnv().getSessionConnection(TABLE_SQL_DIALECT)) {
 +      session.executeNonQueryStatement("USE db1");
 +      session.executeNonQueryStatement(
 +          "CREATE TABLE table1 (id1 string id, attr1 string attribute, "
 +              + "m1 double "
 +              + "measurement)");
 +
 +      List<IMeasurementSchema> schemaList = new ArrayList<>();
 +      schemaList.add(new MeasurementSchema("id1", TSDataType.STRING));
 +      schemaList.add(new MeasurementSchema("attr1", TSDataType.STRING));
 +      schemaList.add(new MeasurementSchema("m1", TSDataType.DOUBLE));
 +      final List<ColumnType> columnTypes =
 +          Arrays.asList(ColumnType.ID, ColumnType.ATTRIBUTE, 
ColumnType.MEASUREMENT);
 +
 +      Tablet tablet = new Tablet("table1", schemaList, columnTypes, 10);
 +
 +      long timestamp = System.currentTimeMillis();
 +
 +      for (long row = 0; row < 15; row++) {
 +        int rowIndex = tablet.rowSize++;
 +        tablet.addTimestamp(rowIndex, timestamp + row);
 +        tablet.addValue("id1", rowIndex, "id:" + row);
 +        tablet.addValue("attr1", rowIndex, "attr:" + row);
 +        tablet.addValue("m1", rowIndex, row * 1.0);
 +        if (tablet.rowSize == tablet.getMaxRowNumber()) {
 +          session.insertRelationalTablet(tablet, true);
 +          tablet.reset();
 +        }
 +        timestamp++;
 +      }
 +
 +      if (tablet.rowSize != 0) {
 +        session.insertRelationalTablet(tablet);
 +        tablet.reset();
 +      }
 +
 +      SessionDataSet dataSet = session.executeQueryStatement("select count(*) 
from table1");
 +      while (dataSet.hasNext()) {
 +        RowRecord rowRecord = dataSet.next();
 +        assertEquals(15L, rowRecord.getFields().get(0).getLongV());
 +      }
 +    }
 +  }
 +}
diff --cc 
iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
index fddc92e40e9,fddc92e40e9..b3a91e4aa6a
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
@@@ -754,6 -754,6 +754,28 @@@ public class ConfigManager implements I
      return resp;
    }
  
++  @Override
++  public TSchemaPartitionTableResp getSchemaPartition(
++      Map<String, List<TSeriesPartitionSlot>> dbSlotMap) {
++    // Construct empty response
++    TSchemaPartitionTableResp resp = new TSchemaPartitionTableResp();
++    // Return empty resp if the partitionSlotsMap is empty
++    if (dbSlotMap.isEmpty()) {
++      return resp.setStatus(StatusUtils.OK).setSchemaPartitionTable(new 
HashMap<>());
++    }
++
++    GetSchemaPartitionPlan getSchemaPartitionPlan =
++        new GetSchemaPartitionPlan(
++             dbSlotMap.entrySet().stream()
++                .collect(Collectors.toMap(Map.Entry::getKey, e -> new 
ArrayList<>(e.getValue()))));
++    SchemaPartitionResp queryResult = 
partitionManager.getSchemaPartition(getSchemaPartitionPlan);
++    resp = queryResult.convertToRpcSchemaPartitionTableResp();
++
++    LOGGER.debug("GetSchemaPartition receive paths: {}, return: {}", 
dbSlotMap, resp);
++
++    return resp;
++  }
++
    @Override
    public TSchemaPartitionTableResp getOrCreateSchemaPartition(PathPatternTree 
patternTree) {
      // Construct empty response
diff --cc 
iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java
index 98695d1eb7b,98695d1eb7b..c280f0fa2a8
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java
@@@ -19,10 -19,10 +19,12 @@@
  
  package org.apache.iotdb.confignode.manager;
  
++import java.util.Map;
  import org.apache.iotdb.common.rpc.thrift.TConfigNodeLocation;
  import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
  import org.apache.iotdb.common.rpc.thrift.TFlushReq;
  import org.apache.iotdb.common.rpc.thrift.TSStatus;
++import org.apache.iotdb.common.rpc.thrift.TSeriesPartitionSlot;
  import org.apache.iotdb.common.rpc.thrift.TSetConfigurationReq;
  import org.apache.iotdb.common.rpc.thrift.TSetSpaceQuotaReq;
  import org.apache.iotdb.common.rpc.thrift.TShowConfigurationResp;
@@@ -355,6 -355,6 +357,13 @@@ public interface IManager 
     */
    TSchemaPartitionTableResp getSchemaPartition(PathPatternTree patternTree);
  
++  /**
++   * Get SchemaPartition with <databaseName, seriesSlot>.
++   *
++   * @return TSchemaPartitionResp
++   */
++  TSchemaPartitionTableResp getSchemaPartition(Map<String, 
List<TSeriesPartitionSlot>> dbSlotMap);
++
    /**
     * Get or create SchemaPartition.
     *
diff --cc 
iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
index 28978ab6648,28978ab6648..df2445c67cf
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
@@@ -19,11 -19,11 +19,13 @@@
  
  package org.apache.iotdb.confignode.service.thrift;
  
++import java.util.Map;
  import org.apache.iotdb.common.rpc.thrift.TConfigNodeLocation;
  import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
  import org.apache.iotdb.common.rpc.thrift.TFlushReq;
  import org.apache.iotdb.common.rpc.thrift.TNodeLocations;
  import org.apache.iotdb.common.rpc.thrift.TSStatus;
++import org.apache.iotdb.common.rpc.thrift.TSeriesPartitionSlot;
  import org.apache.iotdb.common.rpc.thrift.TSetConfigurationReq;
  import org.apache.iotdb.common.rpc.thrift.TSetSpaceQuotaReq;
  import org.apache.iotdb.common.rpc.thrift.TSetTTLReq;
@@@ -530,6 -530,6 +532,12 @@@ public class ConfigNodeRPCServiceProces
      return configManager.getOrCreateSchemaPartition(patternTree);
    }
  
++  @Override
++  public TSchemaPartitionTableResp getOrCreateSchemaPartitionTableWithSlots(
++      Map<String, TSeriesPartitionSlot> dbSlotMap) throws TException {
++    return configManager.geto;
++  }
++
    @Override
    public TSchemaNodeManagementResp 
getSchemaNodeManagementPartition(TSchemaNodeManagementReq req) {
      PathPatternTree patternTree =
diff --cc 
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/ConfigNodeClient.java
index 0b5ec89dacf,0b5ec89dacf..de4b063fe79
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/ConfigNodeClient.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/ConfigNodeClient.java
@@@ -19,12 -19,12 +19,14 @@@
  
  package org.apache.iotdb.db.protocol.client;
  
++import java.util.Map;
  import org.apache.iotdb.common.rpc.thrift.TConfigNodeLocation;
  import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
  import org.apache.iotdb.common.rpc.thrift.TEndPoint;
  import org.apache.iotdb.common.rpc.thrift.TFlushReq;
  import org.apache.iotdb.common.rpc.thrift.TNodeLocations;
  import org.apache.iotdb.common.rpc.thrift.TSStatus;
++import org.apache.iotdb.common.rpc.thrift.TSeriesPartitionSlot;
  import org.apache.iotdb.common.rpc.thrift.TSetConfigurationReq;
  import org.apache.iotdb.common.rpc.thrift.TSetSpaceQuotaReq;
  import org.apache.iotdb.common.rpc.thrift.TSetTTLReq;
@@@ -527,6 -527,6 +529,13 @@@ public class ConfigNodeClient implement
          () -> client.getSchemaPartitionTable(req), resp -> 
!updateConfigNodeLeader(resp.status));
    }
  
++  @Override
++  public TSchemaPartitionTableResp getSchemaPartitionTableWithSlots(
++      Map<String, List<TSeriesPartitionSlot>> dbSlotMap) throws TException {
++    return executeRemoteCallWithRetry(
++        () -> client.getSchemaPartitionTableWithSlots(dbSlotMap), resp -> 
!updateConfigNodeLeader(resp.status));
++  }
++
    @Override
    public TSchemaPartitionTableResp 
getOrCreateSchemaPartitionTable(TSchemaPartitionReq req)
        throws TException {
@@@ -535,6 -535,6 +544,14 @@@
          resp -> !updateConfigNodeLeader(resp.status));
    }
  
++  @Override
++  public TSchemaPartitionTableResp getOrCreateSchemaPartitionTableWithSlots(
++      Map<String, List<TSeriesPartitionSlot>> dbSlotMap) throws TException {
++    return executeRemoteCallWithRetry(
++        () -> client.getOrCreateSchemaPartitionTableWithSlots(dbSlotMap),
++        resp -> !updateConfigNodeLeader(resp.status));
++  }
++
    @Override
    public TSchemaNodeManagementResp 
getSchemaNodeManagementPartition(TSchemaNodeManagementReq req)
        throws TException {
diff --cc 
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/session/IClientSession.java
index edba5195ce8,edba5195ce8..d50ab5e61d3
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/session/IClientSession.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/session/IClientSession.java
@@@ -20,6 -20,6 +20,7 @@@
  package org.apache.iotdb.db.protocol.session;
  
  import org.apache.iotdb.commons.conf.IoTDBConstant.ClientVersion;
++import org.apache.iotdb.db.utils.CommonUtils;
  import org.apache.iotdb.service.rpc.thrift.TSConnectionInfo;
  import org.apache.iotdb.service.rpc.thrift.TSConnectionType;
  
diff --cc 
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/ClusterPartitionFetcher.java
index 2f1e93d4598,1f4f63f6d2d..f3e634b054f
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/ClusterPartitionFetcher.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/ClusterPartitionFetcher.java
@@@ -97,23 -98,23 +98,44 @@@ public class ClusterPartitionFetcher im
  
    @Override
    public SchemaPartition getSchemaPartition(PathPatternTree patternTree) {
--    return getOrCreateSchemaPartition(patternTree, false, null);
++    try (ConfigNodeClient client =
++        
configNodeClientManager.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) {
++      patternTree.constructTree();
++      List<IDeviceID> deviceIDs = patternTree.getAllDevicePatterns();
++      Map<String, List<IDeviceID>> storageGroupToDeviceMap =
++          partitionCache.getDatabaseToDevice(deviceIDs, true, false, null);
++      SchemaPartition schemaPartition = 
partitionCache.getSchemaPartition(storageGroupToDeviceMap);
++      if (null == schemaPartition) {
++        TSchemaPartitionTableResp schemaPartitionTableResp =
++            
client.getSchemaPartitionTable(constructSchemaPartitionReq(patternTree));
++        if (schemaPartitionTableResp.getStatus().getCode()
++            == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
++          schemaPartition = 
parseSchemaPartitionTableResp(schemaPartitionTableResp);
++          partitionCache.updateSchemaPartitionCache(
++              schemaPartitionTableResp.getSchemaPartitionTable());
++        } else {
++          throw new RuntimeException(
++              new IoTDBException(
++                  schemaPartitionTableResp.getStatus().getMessage(),
++                  schemaPartitionTableResp.getStatus().getCode()));
++        }
++      }
++      return schemaPartition;
++    } catch (ClientManagerException | TException e) {
++      throw new StatementAnalyzeException(
++          "An error occurred when executing getSchemaPartition():" + 
e.getMessage());
++    }
    }
  
    @Override
    public SchemaPartition getOrCreateSchemaPartition(PathPatternTree 
patternTree, String userName) {
--    return getOrCreateSchemaPartition(patternTree, true, userName);
--  }
--
--  private SchemaPartition getOrCreateSchemaPartition(
--      PathPatternTree patternTree, boolean isAutoCreate, String userName) {
      try (ConfigNodeClient client =
          
configNodeClientManager.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) {
        patternTree.constructTree();
        List<IDeviceID> deviceIDs = patternTree.getAllDevicePatterns();
--      Map<String, List<IDeviceID>> databaseToDevice =
--          partitionCache.getDatabaseToDevice(deviceIDs, true, isAutoCreate, 
userName);
--      SchemaPartition schemaPartition = 
partitionCache.getSchemaPartition(databaseToDevice);
++      Map<String, List<IDeviceID>> storageGroupToDeviceMap =
++          partitionCache.getDatabaseToDevice(deviceIDs, true, true, userName);
++      SchemaPartition schemaPartition = 
partitionCache.getSchemaPartition(storageGroupToDeviceMap);
        if (null == schemaPartition) {
          TSchemaPartitionTableResp schemaPartitionTableResp =
              
client.getOrCreateSchemaPartitionTable(constructSchemaPartitionReq(patternTree));
@@@ -290,17 -291,12 +312,15 @@@
          
configNodeClientManager.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) {
        partitionCache.checkAndAutoCreateDatabase(database, isAutoCreate, 
userName);
        SchemaPartition schemaPartition =
-           partitionCache.getSchemaPartition(
-               new HashMap<String, List<IDeviceID>>() {
-                 {
-                   put(database, deviceIDs);
-                 }
-               });
+           
partitionCache.getSchemaPartition(Collections.singletonMap(database, 
deviceIDs));
        if (null == schemaPartition) {
          PathPatternTree tree = new PathPatternTree();
--        tree.appendPathPattern(new PartialPath(database + "." + 
MULTI_LEVEL_PATH_WILDCARD));
++//        tree.appendPathPattern(new PartialPath(database + "." + 
MULTI_LEVEL_PATH_WILDCARD));
++        for (IDeviceID deviceID : deviceIDs) {
++          tree.appendPathPattern(new PartialPath(deviceID.toString()));
++        }
          TSchemaPartitionTableResp schemaPartitionTableResp =
 -            client.getSchemaPartitionTable(constructSchemaPartitionReq(tree));
 +            
client.getOrCreateSchemaPartitionTable(constructSchemaPartitionReq(tree));
          if (schemaPartitionTableResp.getStatus().getCode()
              == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
            schemaPartition = 
parseSchemaPartitionTableResp(schemaPartitionTableResp);
diff --cc 
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/schema/SchemaValidator.java
index d1c1aac54d4,7bbdd5cefc6..63f9d3afd9c
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/schema/SchemaValidator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/schema/SchemaValidator.java
@@@ -62,21 -55,6 +62,24 @@@ public class SchemaValidator 
      }
    }
  
 +  public static void validate(
 +      Metadata metadata, WrappedInsertStatement insertStatement, 
MPPQueryContext context) {
 +    try {
 +      String databaseName = 
context.getSession().getDatabaseName().orElse(null);
 +      final TableSchema incomingSchema = insertStatement.getTableSchema();
 +      final TableSchema realSchema =
-           metadata.validateTableHeaderSchema(databaseName, incomingSchema, 
context);
++          metadata.validateTableHeaderSchema(databaseName, incomingSchema, 
context).orElse(null);
++      if (realSchema == null) {
++        throw new SemanticException("Schema validation failed, table cannot 
be created: " + incomingSchema);
++      }
 +      insertStatement.validate(realSchema);
 +      metadata.validateDeviceSchema(insertStatement, context);
 +      insertStatement.updateAfterSchemaValidation(context);
 +    } catch (QueryProcessException e) {
 +      throw new SemanticException(e.getMessage());
 +    }
 +  }
 +
    public static ISchemaTree validate(
        ISchemaFetcher schemaFetcher,
        List<PartialPath> devicePaths,
diff --cc 
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LocalExecutionPlanner.java
index 3b74f1b4a08,3b74f1b4a08..979a0ce4c89
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LocalExecutionPlanner.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LocalExecutionPlanner.java
@@@ -136,7 -136,7 +136,18 @@@ public class LocalExecutionPlanner 
      LocalExecutionPlanContext context =
          new LocalExecutionPlanContext(instanceContext, schemaRegion);
  
--    Operator root = plan.accept(new OperatorTreeGenerator(), context);
++    Operator root;
++    IClientSession.SqlDialect sqlDialect = 
instanceContext.getSessionInfo().getSqlDialect();
++    switch (sqlDialect) {
++      case TREE:
++        root = plan.accept(new OperatorTreeGenerator(), context);
++        break;
++      case TABLE:
++        root = plan.accept(new TableOperatorGenerator(metadata), context);
++        break;
++      default:
++        throw new IllegalArgumentException(String.format("Unknown sql 
dialect: %s", sqlDialect));
++    }
  
      PipelineMemoryEstimator memoryEstimator =
          context.constructPipelineMemoryEstimator(root, null, plan, -1);
diff --cc 
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/OperatorTreeGenerator.java
index 71dc05861d4,71dc05861d4..c937e3e9794
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/OperatorTreeGenerator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/OperatorTreeGenerator.java
@@@ -186,6 -186,6 +186,7 @@@ import org.apache.iotdb.db.queryengine.
  import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.read.SchemaQueryOrderByHeatNode;
  import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.read.SchemaQueryScanNode;
  import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.read.SeriesSchemaFetchScanNode;
++import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.read.TableDeviceFetchNode;
  import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.read.TimeSeriesCountNode;
  import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.read.TimeSeriesSchemaScanNode;
  import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.ActiveRegionScanMergeNode;
diff --cc 
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanNodeType.java
index 7536c309bc6,91ab840732a..a9ece13473d
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanNodeType.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanNodeType.java
@@@ -241,8 -240,7 +241,9 @@@ public enum PlanNodeType 
    TABLE_MERGESORT_NODE((short) 1007),
    TABLE_TOPK_NODE((short) 1008),
    TABLE_COLLECT_NODE((short) 1009),
 -  TABLE_STREAM_SORT_NODE((short) 1010);
++  TABLE_STREAM_SORT_NODE((short) 1010),
 +
 +  RELATIONAL_INSERT_TABLET((short) 2000);
  
    public static final int BYTES = Short.BYTES;
  
diff --cc 
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/fetcher/TableHeaderSchemaValidator.java
index cef7ad3a799,a941dc2a272..f0fca77afcc
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/fetcher/TableHeaderSchemaValidator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/fetcher/TableHeaderSchemaValidator.java
@@@ -73,16 -73,19 +74,20 @@@ public class TableHeaderSchemaValidato
      return TableHeaderSchemaValidatorHolder.INSTANCE;
    }
  
-   // This method return all the existing column schemas in the target table.
-   // When table or column is missing, this method will execute auto creation.
-   // When using SQL, the columnSchemaList could be null and there won't be 
any validation.
-   // All input column schemas will be validated and auto created when 
necessary.
-   // When the input dataType or category of one column is null, the column 
cannot be auto created.
-   public TableSchema validateTableHeaderSchema(
+   public Optional<TableSchema> validateTableHeaderSchema(
        String database, TableSchema tableSchema, MPPQueryContext context) {
+     // The schema cache R/W and fetch operation must be locked together thus 
the cache clean
+     // operation executed by delete timeseries will be effective.
+     
DataNodeSchemaLockManager.getInstance().takeReadLock(SchemaLockType.VALIDATE_VS_DELETION);
+     context.addAcquiredLockNum(SchemaLockType.VALIDATE_VS_DELETION);
+ 
      List<ColumnSchema> inputColumnList = tableSchema.getColumns();
+     if (inputColumnList == null || inputColumnList.isEmpty()) {
+       throw new IllegalArgumentException(
+           "Column List in TableSchema should never be null or empty.");
+     }
      TsTable table = DataNodeTableCache.getInstance().getTable(database, 
tableSchema.getTableName());
 +    LOGGER.info("Get TsTable from cache: {}", table);
      List<ColumnSchema> missingColumnList = new ArrayList<>();
      List<ColumnSchema> resultColumnList = new ArrayList<>();
  
diff --cc 
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/LogicalPlanner.java
index 171159f57b6,ac17c3ff500..1aa2f80e0d0
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/LogicalPlanner.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/LogicalPlanner.java
@@@ -90,14 -88,25 +90,25 @@@ public class LogicalPlanner 
              new RemoveRedundantIdentityProjections());
    }
  
+   public LogicalPlanner(
+       MPPQueryContext context,
+       Metadata metadata,
+       SessionInfo sessionInfo,
+       List<TablePlanOptimizer> tablePlanOptimizers,
+       WarningCollector warningCollector) {
+     this.context = context;
+     this.metadata = metadata;
+     this.sessionInfo = requireNonNull(sessionInfo, "session is null");
+     this.warningCollector = requireNonNull(warningCollector, 
"warningCollector is null");
+     this.tablePlanOptimizers = tablePlanOptimizers;
+   }
+ 
    public LogicalQueryPlan plan(Analysis analysis) {
      PlanNode planNode = planStatement(analysis, analysis.getStatement());
 -
 -    tablePlanOptimizers.forEach(
 -        optimizer -> optimizer.optimize(planNode, analysis, metadata, 
sessionInfo, context));
--
 +    if (analysis.getStatement() instanceof Query) {
-       relationalPlanOptimizers.forEach(
++      tablePlanOptimizers.forEach(
 +          optimizer -> optimizer.optimize(planNode, analysis, metadata, 
sessionInfo, context));
 +    }
- 
      return new LogicalQueryPlan(context, planNode);
    }
  
diff --cc 
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/RelationPlanner.java
index da60edf80f7,d85fcc9056c..ac1ecdd2901
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/RelationPlanner.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/RelationPlanner.java
@@@ -48,7 -43,7 +49,8 @@@ import com.google.common.collect.Immuta
  import com.google.common.collect.ImmutableMap;
  
  import java.util.Collection;
 +import java.util.Collections;
+ import java.util.HashMap;
  import java.util.List;
  import java.util.Map;
  
diff --cc 
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/DistributedPlanGenerator.java
index 00000000000,0bf5bb32a1b..65041da0c69
mode 000000,100644..100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/DistributedPlanGenerator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/DistributedPlanGenerator.java
@@@ -1,0 -1,493 +1,495 @@@
+ /*
+  * Licensed under the Apache License, Version 2.0 (the "License");
+  * you may not use this file except in compliance with the License.
+  * You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ package org.apache.iotdb.db.queryengine.plan.relational.planner.distribute;
+ 
+ import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
+ import org.apache.iotdb.commons.schema.table.column.TsTableColumnCategory;
+ import org.apache.iotdb.db.queryengine.common.MPPQueryContext;
+ import org.apache.iotdb.db.queryengine.common.QueryId;
+ import 
org.apache.iotdb.db.queryengine.plan.planner.distribution.NodeDistribution;
+ import 
org.apache.iotdb.db.queryengine.plan.planner.distribution.NodeDistributionType;
+ import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
+ import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId;
+ import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanVisitor;
+ import org.apache.iotdb.db.queryengine.plan.planner.plan.node.WritePlanNode;
+ import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.read.AbstractSchemaMergeNode;
+ import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.read.SchemaQueryMergeNode;
+ import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.read.TableDeviceSourceNode;
+ import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.ExchangeNode;
+ import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.SourceNode;
+ import org.apache.iotdb.db.queryengine.plan.relational.analyzer.Analysis;
+ import org.apache.iotdb.db.queryengine.plan.relational.metadata.DeviceEntry;
+ import org.apache.iotdb.db.queryengine.plan.relational.planner.OrderingScheme;
+ import org.apache.iotdb.db.queryengine.plan.relational.planner.SortOrder;
+ import org.apache.iotdb.db.queryengine.plan.relational.planner.Symbol;
+ import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.CollectNode;
+ import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.FilterNode;
+ import org.apache.iotdb.db.queryengine.plan.relational.planner.node.LimitNode;
+ import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.MergeSortNode;
+ import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.OffsetNode;
+ import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.OutputNode;
+ import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.ProjectNode;
+ import org.apache.iotdb.db.queryengine.plan.relational.planner.node.SortNode;
+ import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.TableScanNode;
+ import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Expression;
+ import org.apache.iotdb.db.queryengine.plan.statement.component.Ordering;
+ 
+ import java.util.ArrayList;
+ import java.util.Collections;
+ import java.util.Comparator;
+ import java.util.HashMap;
+ import java.util.HashSet;
+ import java.util.List;
+ import java.util.Map;
+ import java.util.Set;
+ import java.util.function.Function;
+ import java.util.stream.Collectors;
+ import java.util.stream.IntStream;
++import org.apache.iotdb.db.utils.CommonUtils;
+ 
+ import static com.google.common.collect.ImmutableList.toImmutableList;
+ import static 
org.apache.iotdb.db.queryengine.plan.relational.planner.optimizations.PushPredicateIntoTableScan.containsDiffFunction;
+ import static org.apache.iotdb.db.utils.constant.TestConstant.TIMESTAMP_STR;
+ 
+ /** This class is used to generate distributed plan for table model. */
+ public class DistributedPlanGenerator
+     extends PlanVisitor<List<PlanNode>, DistributedPlanGenerator.PlanContext> 
{
+   private final QueryId queryId;
+   private final Analysis analysis;
+   Map<PlanNodeId, OrderingScheme> nodeOrderingMap = new HashMap<>();
+ 
+   public DistributedPlanGenerator(MPPQueryContext queryContext, Analysis 
analysis) {
+     this.queryId = queryContext.getQueryId();
+     this.analysis = analysis;
+   }
+ 
+   public List<PlanNode> genResult(PlanNode node, PlanContext context) {
+     return node.accept(this, context);
+   }
+ 
+   @Override
+   public List<PlanNode> visitPlan(PlanNode node, 
DistributedPlanGenerator.PlanContext context) {
+     if (node instanceof WritePlanNode) {
+       return Collections.singletonList(node);
+     }
+ 
+     List<List<PlanNode>> children =
+         node.getChildren().stream()
+             .map(child -> child.accept(this, context))
+             .collect(toImmutableList());
+ 
+     PlanNode newNode = node.clone();
+     for (List<PlanNode> planNodes : children) {
+       planNodes.forEach(newNode::addChild);
+     }
+     return Collections.singletonList(newNode);
+   }
+ 
+   @Override
+   public List<PlanNode> visitOutput(OutputNode node, PlanContext context) {
+     List<PlanNode> childrenNodes = node.getChild().accept(this, context);
+     OrderingScheme childOrdering = 
nodeOrderingMap.get(childrenNodes.get(0).getPlanNodeId());
+     if (childOrdering != null) {
+       nodeOrderingMap.put(node.getPlanNodeId(), childOrdering);
+     }
+ 
+     if (childrenNodes.size() == 1) {
+       node.setChild(childrenNodes.get(0));
+       return Collections.singletonList(node);
+     }
+ 
+     node.setChild(mergeChildrenViaCollectOrMergeSort(childOrdering, 
childrenNodes));
+     return Collections.singletonList(node);
+   }
+ 
+   @Override
+   public List<PlanNode> visitLimit(LimitNode node, PlanContext context) {
+     List<PlanNode> childrenNodes = node.getChild().accept(this, context);
+     OrderingScheme childOrdering = 
nodeOrderingMap.get(childrenNodes.get(0).getPlanNodeId());
+     if (childOrdering != null) {
+       nodeOrderingMap.put(node.getPlanNodeId(), childOrdering);
+     }
+ 
+     if (childrenNodes.size() == 1) {
+       node.setChild(childrenNodes.get(0));
+       return Collections.singletonList(node);
+     }
+ 
+     // push down LimitNode in distributed plan optimize rule
+     node.setChild(mergeChildrenViaCollectOrMergeSort(childOrdering, 
childrenNodes));
+     return Collections.singletonList(node);
+   }
+ 
+   @Override
+   public List<PlanNode> visitOffset(OffsetNode node, PlanContext context) {
+     List<PlanNode> childrenNodes = node.getChild().accept(this, context);
+     OrderingScheme childOrdering = 
nodeOrderingMap.get(childrenNodes.get(0).getPlanNodeId());
+     if (childOrdering != null) {
+       nodeOrderingMap.put(node.getPlanNodeId(), childOrdering);
+     }
+ 
+     if (childrenNodes.size() == 1) {
+       node.setChild(childrenNodes.get(0));
+       return Collections.singletonList(node);
+     }
+ 
+     node.setChild(mergeChildrenViaCollectOrMergeSort(childOrdering, 
childrenNodes));
+     return Collections.singletonList(node);
+   }
+ 
+   @Override
+   public List<PlanNode> visitProject(ProjectNode node, PlanContext context) {
+     List<PlanNode> childrenNodes = node.getChild().accept(this, context);
+     OrderingScheme childOrdering = 
nodeOrderingMap.get(childrenNodes.get(0).getPlanNodeId());
+     if (childOrdering != null) {
+       nodeOrderingMap.put(node.getPlanNodeId(), childOrdering);
+     }
+ 
+     if (childrenNodes.size() == 1) {
+       node.setChild(childrenNodes.get(0));
+       return Collections.singletonList(node);
+     }
+ 
+     for (Expression expression : node.getAssignments().getMap().values()) {
+       if (containsDiffFunction(expression)) {
+         node.setChild(mergeChildrenViaCollectOrMergeSort(childOrdering, 
childrenNodes));
+         return Collections.singletonList(node);
+       }
+     }
+ 
+     List<PlanNode> resultNodeList = new ArrayList<>();
+     for (int i = 0; i < childrenNodes.size(); i++) {
+       PlanNode child = childrenNodes.get(i);
+       ProjectNode subProjectNode =
+           new ProjectNode(queryId.genPlanNodeId(), child, 
node.getAssignments());
+       resultNodeList.add(subProjectNode);
+       if (i == 0) {
+         nodeOrderingMap.put(subProjectNode.getPlanNodeId(), childOrdering);
+       }
+     }
+     return resultNodeList;
+   }
+ 
+   @Override
+   public List<PlanNode> visitSort(SortNode node, PlanContext context) {
+     context.expectedOrderingScheme = node.getOrderingScheme();
+     context.hasSortNode = true;
+     nodeOrderingMap.put(node.getPlanNodeId(), node.getOrderingScheme());
+ 
+     List<PlanNode> childrenNodes = node.getChild().accept(this, context);
+     if (childrenNodes.size() == 1) {
+       node.setChild(childrenNodes.get(0));
+       return Collections.singletonList(node);
+     }
+ 
+     // may have ProjectNode above SortNode later, so use MergeSortNode but 
not return SortNode list
+     MergeSortNode mergeSortNode =
+         new MergeSortNode(
+             queryId.genPlanNodeId(), node.getOrderingScheme(), 
node.getOutputSymbols());
+     for (PlanNode child : childrenNodes) {
+       SortNode subSortNode =
+           new SortNode(queryId.genPlanNodeId(), child, 
node.getOrderingScheme(), false);
+       mergeSortNode.addChild(subSortNode);
+     }
+     nodeOrderingMap.put(mergeSortNode.getPlanNodeId(), 
mergeSortNode.getOrderingScheme());
+ 
+     return Collections.singletonList(mergeSortNode);
+   }
+ 
+   @Override
+   public List<PlanNode> visitFilter(FilterNode node, PlanContext context) {
+     List<PlanNode> childrenNodes = node.getChild().accept(this, context);
+     OrderingScheme childOrdering = 
nodeOrderingMap.get(childrenNodes.get(0).getPlanNodeId());
+     if (childOrdering != null) {
+       nodeOrderingMap.put(node.getPlanNodeId(), childOrdering);
+     }
+ 
+     if (childrenNodes.size() == 1) {
+       node.setChild(childrenNodes.get(0));
+       return Collections.singletonList(node);
+     }
+ 
+     if (containsDiffFunction(node.getPredicate())) {
+       node.setChild(mergeChildrenViaCollectOrMergeSort(childOrdering, 
childrenNodes));
+       return Collections.singletonList(node);
+     }
+ 
+     List<PlanNode> resultNodeList = new ArrayList<>();
+     for (int i = 0; i < childrenNodes.size(); i++) {
+       PlanNode child = childrenNodes.get(i);
+       FilterNode subFilterNode =
+           new FilterNode(queryId.genPlanNodeId(), child, node.getPredicate());
+       resultNodeList.add(subFilterNode);
+       if (i == 0) {
+         nodeOrderingMap.put(subFilterNode.getPlanNodeId(), childOrdering);
+       }
+     }
+     return resultNodeList;
+   }
+ 
+   @Override
+   public List<PlanNode> visitTableScan(TableScanNode node, PlanContext 
context) {
+ 
+     Map<TRegionReplicaSet, TableScanNode> tableScanNodeMap = new HashMap<>();
+ 
+     for (DeviceEntry deviceEntry : node.getDeviceEntries()) {
+       List<TRegionReplicaSet> regionReplicaSets =
+           analysis
+               .getDataPartitionInfo()
+               .getDataRegionReplicaSetWithTimeFilter(
+                   node.getQualifiedObjectName().getDatabaseName(),
+                   deviceEntry.getDeviceID(),
+                   node.getTimeFilter());
+       for (TRegionReplicaSet regionReplicaSet : regionReplicaSets) {
+         TableScanNode tableScanNode =
+             tableScanNodeMap.computeIfAbsent(
+                 regionReplicaSet,
+                 k -> {
+                   TableScanNode scanNode =
+                       new TableScanNode(
+                           queryId.genPlanNodeId(),
+                           node.getQualifiedObjectName(),
+                           node.getOutputSymbols(),
+                           node.getAssignments(),
+                           new ArrayList<>(),
+                           node.getIdAndAttributeIndexMap(),
+                           node.getScanOrder(),
+                           node.getTimePredicate().orElse(null),
+                           node.getPushDownPredicate());
+                   scanNode.setRegionReplicaSet(regionReplicaSet);
+                   return scanNode;
+                 });
+         tableScanNode.appendDeviceEntry(deviceEntry);
+       }
+     }
+ 
+     context.hasExchangeNode = tableScanNodeMap.size() > 1;
+ 
+     List<PlanNode> resultTableScanNodeList = new ArrayList<>();
+     TRegionReplicaSet mostUsedDataRegion = null;
+     int maxDeviceEntrySizeOfTableScan = 0;
+     for (Map.Entry<TRegionReplicaSet, TableScanNode> entry : 
tableScanNodeMap.entrySet()) {
+       TRegionReplicaSet regionReplicaSet = entry.getKey();
+       TableScanNode subTableScanNode = entry.getValue();
+       subTableScanNode.setPlanNodeId(queryId.genPlanNodeId());
+       subTableScanNode.setRegionReplicaSet(regionReplicaSet);
+       resultTableScanNodeList.add(subTableScanNode);
+ 
+       if (mostUsedDataRegion == null
+           || subTableScanNode.getDeviceEntries().size() > 
maxDeviceEntrySizeOfTableScan) {
+         mostUsedDataRegion = regionReplicaSet;
+         maxDeviceEntrySizeOfTableScan = 
subTableScanNode.getDeviceEntries().size();
+       }
+     }
+     context.mostUsedDataRegion = mostUsedDataRegion;
+ 
+     if (!context.hasSortNode) {
+       return resultTableScanNodeList;
+     }
+ 
+     processSortProperty(node, resultTableScanNodeList, context);
+     return resultTableScanNodeList;
+   }
+ 
+   private PlanNode mergeChildrenViaCollectOrMergeSort(
+       OrderingScheme childOrdering, List<PlanNode> childrenNodes) {
+     PlanNode firstChild = childrenNodes.get(0);
+ 
+     // children has sort property, use MergeSort to merge children
+     if (childOrdering != null) {
+       MergeSortNode mergeSortNode =
+           new MergeSortNode(queryId.genPlanNodeId(), childOrdering, 
firstChild.getOutputSymbols());
+       childrenNodes.forEach(mergeSortNode::addChild);
+       nodeOrderingMap.put(mergeSortNode.getPlanNodeId(), childOrdering);
+       return mergeSortNode;
+     }
+ 
+     // children has no sort property, use CollectNode to merge children
+     CollectNode collectNode = new CollectNode(queryId.genPlanNodeId());
+     childrenNodes.forEach(collectNode::addChild);
+     return collectNode;
+   }
+ 
+   private void processSortProperty(
+       TableScanNode tableScanNode, List<PlanNode> resultTableScanNodeList, 
PlanContext context) {
+     List<Symbol> newOrderingSymbols = new ArrayList<>();
+     List<SortOrder> newSortOrders = new ArrayList<>();
+     OrderingScheme expectedOrderingScheme = context.expectedOrderingScheme;
+ 
+     for (Symbol symbol : expectedOrderingScheme.getOrderBy()) {
+       if (TIMESTAMP_STR.equalsIgnoreCase(symbol.getName())) {
+         if (!expectedOrderingScheme.getOrderings().get(symbol).isAscending()) 
{
+           // TODO(beyyes) move scan order judgement into logical plan 
optimizer
+           resultTableScanNodeList.forEach(
+               node -> ((TableScanNode) node).setScanOrder(Ordering.DESC));
+         }
+         break;
+       } else if 
(!tableScanNode.getIdAndAttributeIndexMap().containsKey(symbol)) {
+         break;
+       }
+ 
+       newOrderingSymbols.add(symbol);
+       newSortOrders.add(expectedOrderingScheme.getOrdering(symbol));
+     }
+ 
+     // no sort property can be pushed down into TableScanNode
+     if (newOrderingSymbols.isEmpty()) {
+       return;
+     }
+ 
+     List<Function<DeviceEntry, String>> orderingRules = new ArrayList<>();
+     for (Symbol symbol : newOrderingSymbols) {
+       int idx = tableScanNode.getIdAndAttributeIndexMap().get(symbol);
+       if (tableScanNode.getAssignments().get(symbol).getColumnCategory()
+           == TsTableColumnCategory.ID) {
+         // segments[0] is always tableName
+         orderingRules.add(deviceEntry -> (String) 
deviceEntry.getDeviceID().getSegments()[idx + 1]);
+       } else {
+         orderingRules.add(deviceEntry -> 
deviceEntry.getAttributeColumnValues().get(idx));
+       }
+     }
+ 
+     Comparator<DeviceEntry> comparator;
+     if (newSortOrders.get(0).isNullsFirst()) {
+       comparator =
+           newSortOrders.get(0).isAscending()
+               ? 
Comparator.nullsFirst(Comparator.comparing(orderingRules.get(0)))
+               : 
Comparator.nullsFirst(Comparator.comparing(orderingRules.get(0))).reversed();
+     } else {
+       comparator =
+           newSortOrders.get(0).isAscending()
+               ? 
Comparator.nullsLast(Comparator.comparing(orderingRules.get(0)))
+               : 
Comparator.nullsLast(Comparator.comparing(orderingRules.get(0))).reversed();
+     }
+     for (int i = 1; i < orderingRules.size(); i++) {
+       Comparator<DeviceEntry> thenComparator;
+       if (newSortOrders.get(i).isNullsFirst()) {
+         thenComparator =
+             newSortOrders.get(i).isAscending()
+                 ? 
Comparator.nullsFirst(Comparator.comparing(orderingRules.get(i)))
+                 : 
Comparator.nullsFirst(Comparator.comparing(orderingRules.get(i))).reversed();
+       } else {
+         thenComparator =
+             newSortOrders.get(i).isAscending()
+                 ? 
Comparator.nullsLast(Comparator.comparing(orderingRules.get(i)))
+                 : 
Comparator.nullsLast(Comparator.comparing(orderingRules.get(i))).reversed();
+       }
+       comparator = comparator.thenComparing(thenComparator);
+     }
+ 
+     OrderingScheme newOrderingScheme =
+         new OrderingScheme(
+             newOrderingSymbols,
+             IntStream.range(0, newOrderingSymbols.size())
+                 .boxed()
+                 .collect(Collectors.toMap(newOrderingSymbols::get, 
newSortOrders::get)));
+     for (PlanNode planNode : resultTableScanNodeList) {
+       TableScanNode scanNode = (TableScanNode) planNode;
+       nodeOrderingMap.put(scanNode.getPlanNodeId(), newOrderingScheme);
+       scanNode.getDeviceEntries().sort(comparator);
+     }
+   }
+ 
+   // ------------------- schema related interface 
---------------------------------------------
+ 
+   @Override
+   public List<PlanNode> visitSchemaQueryMerge(SchemaQueryMergeNode node, 
PlanContext context) {
+     return Collections.singletonList(
+         addExchangeNodeForSchemaMerge(rewriteSchemaQuerySource(node, 
context), context));
+   }
+ 
+   private SchemaQueryMergeNode rewriteSchemaQuerySource(
+       SchemaQueryMergeNode node, PlanContext context) {
+     SchemaQueryMergeNode root = (SchemaQueryMergeNode) node.clone();
+ 
+     String database = ((TableDeviceSourceNode) 
node.getChildren().get(0)).getDatabase();
++    database = CommonUtils.qualifyDatabaseName(database);
+     Set<TRegionReplicaSet> schemaRegionSet = new HashSet<>();
+     analysis
+         .getSchemaPartitionInfo()
+         .getSchemaPartitionMap()
+         .get(database)
+         .forEach(
+             (deviceGroupId, schemaRegionReplicaSet) -> 
schemaRegionSet.add(schemaRegionReplicaSet));
+ 
+     for (PlanNode child : node.getChildren()) {
+       for (TRegionReplicaSet schemaRegion : schemaRegionSet) {
+         SourceNode clonedChild = (SourceNode) child.clone();
+         clonedChild.setPlanNodeId(queryId.genPlanNodeId());
+         clonedChild.setRegionReplicaSet(schemaRegion);
+         root.addChild(clonedChild);
+       }
+     }
+     return root;
+   }
+ 
+   private PlanNode addExchangeNodeForSchemaMerge(
+       AbstractSchemaMergeNode node, PlanContext context) {
+     node.getChildren()
+         .forEach(
+             child ->
+                 context.putNodeDistribution(
+                     child.getPlanNodeId(),
+                     new NodeDistribution(
+                         NodeDistributionType.NO_CHILD,
+                         ((SourceNode) child).getRegionReplicaSet())));
+     NodeDistribution nodeDistribution =
+         new 
NodeDistribution(NodeDistributionType.DIFFERENT_FROM_ALL_CHILDREN);
+     PlanNode newNode = node.clone();
+     
nodeDistribution.setRegion(calculateSchemaRegionByChildren(node.getChildren(), 
context));
+     context.putNodeDistribution(newNode.getPlanNodeId(), nodeDistribution);
+     node.getChildren()
+         .forEach(
+             child -> {
+               if (!nodeDistribution
+                   .getRegion()
+                   
.equals(context.getNodeDistribution(child.getPlanNodeId()).getRegion())) {
+                 ExchangeNode exchangeNode = new 
ExchangeNode(queryId.genPlanNodeId());
+                 exchangeNode.setChild(child);
+                 
exchangeNode.setOutputColumnNames(child.getOutputColumnNames());
+                 context.hasExchangeNode = true;
+                 newNode.addChild(exchangeNode);
+               } else {
+                 newNode.addChild(child);
+               }
+             });
+     return newNode;
+   }
+ 
+   private TRegionReplicaSet calculateSchemaRegionByChildren(
+       List<PlanNode> children, PlanContext context) {
+     // We always make the schemaRegion of SchemaMergeNode to be the same as 
its first child.
+     return 
context.getNodeDistribution(children.get(0).getPlanNodeId()).getRegion();
+   }
+ 
+   public static class PlanContext {
+     final Map<PlanNodeId, NodeDistribution> nodeDistributionMap;
+     boolean hasExchangeNode = false;
+     boolean hasSortNode = false;
+     OrderingScheme expectedOrderingScheme;
+     TRegionReplicaSet mostUsedDataRegion;
+ 
+     public PlanContext() {
+       this.nodeDistributionMap = new HashMap<>();
+     }
+ 
+     public NodeDistribution getNodeDistribution(PlanNodeId nodeId) {
+       return this.nodeDistributionMap.get(nodeId);
+     }
+ 
+     public void putNodeDistribution(PlanNodeId nodeId, NodeDistribution 
distribution) {
+       this.nodeDistributionMap.put(nodeId, distribution);
+     }
+   }
+ }
diff --cc 
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributionPlanner.java
index 843eace5282,70a87f06dec..038e4fac02e
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributionPlanner.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributionPlanner.java
@@@ -81,11 -84,9 +86,11 @@@ public class TableDistributionPlanner 
      List<FragmentInstance> fragmentInstances =
          mppQueryContext.getQueryType() == QueryType.READ
              ? new TableModelQueryFragmentPlanner(subPlan, analysis, 
mppQueryContext).plan()
 -            : new WriteFragmentParallelPlanner(subPlan, analysis, 
mppQueryContext).parallelPlan();
 +            : new WriteFragmentParallelPlanner(
 +                    subPlan, analysis, mppQueryContext, 
WritePlanNode::splitByPartition)
 +                .parallelPlan();
  
-     // Only execute this step for READ operation
+     // only execute this step for READ operation
      if (mppQueryContext.getQueryType() == QueryType.READ) {
        setSinkForRootInstance(subPlan, fragmentInstances);
      }
diff --cc 
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/CreateTableDeviceNode.java
index 13209c2274f,6de1ef903ce..a01e6b1c5df
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/CreateTableDeviceNode.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/CreateTableDeviceNode.java
@@@ -122,16 -151,16 +151,15 @@@ public class CreateTableDeviceNode exte
  
    public List<IDeviceID> getPartitionKeyList() {
      if (partitionKeyList == null) {
-       List<IDeviceID> partitionKeyList = new ArrayList<>();
+       List<IDeviceID> tmpPartitionKeyList = new ArrayList<>();
        for (Object[] rawId : deviceIdList) {
--        String[] partitionKey = new String[rawId.length + 1];
--        partitionKey[0] = tableName;
++        String[] partitionKey = new String[rawId.length];
          for (int i = 0; i < rawId.length; i++) {
-           partitionKey[i + 1] = Objects.toString(rawId[i].toString());
 -          partitionKey[i + 1] = (String) rawId[i];
++          partitionKey[i] = (String) rawId[i];
          }
-         
partitionKeyList.add(IDeviceID.Factory.DEFAULT_FACTORY.create(partitionKey));
+         
tmpPartitionKeyList.add(IDeviceID.Factory.DEFAULT_FACTORY.create(partitionKey));
        }
-       this.partitionKeyList = partitionKeyList;
+       this.partitionKeyList = tmpPartitionKeyList;
      }
      return partitionKeyList;
    }
diff --cc 
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/PushPredicateIntoTableScan.java
index 4baa449773f,98b10ab23ce..f52eeff656d
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/PushPredicateIntoTableScan.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/PushPredicateIntoTableScan.java
@@@ -248,17 -242,6 +244,17 @@@ public class PushPredicateIntoTableSca
        return node;
      }
  
 +    @Override
-     public PlanNode visitInsertTablet(InsertTabletNode node, RewriterContext 
context) {
++    public PlanNode visitInsertTablet(InsertTabletNode node, Void context) {
 +      return node;
 +    }
 +
 +    @Override
 +    public PlanNode visitRelationalInsertTablet(
-         RelationalInsertTabletNode node, RewriterContext context) {
++        RelationalInsertTabletNode node, Void context) {
 +      return node;
 +    }
 +
      /** Get deviceEntries and DataPartition used in TableScan. */
      private void tableMetadataIndexScan(TableScanNode node, List<Expression> 
metadataExpressions) {
        List<String> attributeColumns =
diff --cc 
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/CommonUtils.java
index cbadd577eea,9b902d0cf44..26e04803f5a
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/CommonUtils.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/CommonUtils.java
@@@ -402,76 -397,4 +402,83 @@@ public class CommonUtils 
      System.err.println("-- StackTrace --");
      System.err.println(Throwables.getStackTraceAsString(e));
    }
 +
 +  public static String[] deviceIdToStringArray(IDeviceID deviceID) {
 +    String[] ret = new String[deviceID.segmentNum()];
 +    for (int i = 0; i < ret.length; i++) {
 +      ret[i] = deviceID.segment(i).toString();
 +    }
 +    return ret;
 +  }
 +
 +  public static Object[] deviceIdToObjArray(IDeviceID deviceID) {
 +    Object[] ret = new Object[deviceID.segmentNum()];
 +    for (int i = 0; i < ret.length; i++) {
 +      ret[i] = deviceID.segment(i);
 +    }
 +    return ret;
 +  }
 +
 +  /**
 +   * Check whether the time falls in TTL.
 +   *
 +   * @return whether the given time falls in ttl
 +   */
 +  public static boolean isAlive(long time, long dataTTL) {
 +    return dataTTL == Long.MAX_VALUE || (CommonDateTimeUtils.currentTime() - 
time) <= dataTTL;
 +  }
 +
 +  public static Object createValueColumnOfDataType(
 +      TSDataType dataType, TsTableColumnCategory columnCategory, int rowNum) {
 +    Object valueColumn;
 +    switch (dataType) {
 +      case INT32:
 +        valueColumn = new int[rowNum];
 +        break;
 +      case INT64:
 +      case TIMESTAMP:
 +        valueColumn = new long[rowNum];
 +        break;
 +      case FLOAT:
 +        valueColumn = new float[rowNum];
 +        break;
 +      case DOUBLE:
 +        valueColumn = new double[rowNum];
 +        break;
 +      case BOOLEAN:
 +        valueColumn = new boolean[rowNum];
 +        break;
 +      case TEXT:
 +      case STRING:
 +        if (columnCategory.equals(TsTableColumnCategory.MEASUREMENT)) {
 +          valueColumn = new Binary[rowNum];
 +        } else {
 +          valueColumn = new String[rowNum];
 +        }
 +        break;
 +      case BLOB:
 +        valueColumn = new Binary[rowNum];
 +        break;
 +      case DATE:
 +        valueColumn = new LocalDate[rowNum];
 +        break;
 +      default:
 +        throw new UnSupportedDataTypeException(
 +            String.format("Data type %s is not supported.", dataType));
 +    }
 +    return valueColumn;
 +  }
 +
 +  public static void swapArray(Object[] array, int i, int j) {
 +    Object tmp = array[i];
 +    array[i] = array[j];
 +    array[j] = tmp;
 +  }
++
++  public static String qualifyDatabaseName(String databaseName) {
++    if (databaseName != null && !databaseName.startsWith("root.")) {
++      databaseName = "root." + databaseName;
++    }
++    return databaseName;
++  }
  }
diff --cc 
iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/AnalyzerTest.java
index 292586672a3,4ff1d6c2b03..55e6bbbc251
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/AnalyzerTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/AnalyzerTest.java
@@@ -37,8 -28,7 +37,9 @@@ import org.apache.iotdb.db.queryengine.
  import org.apache.iotdb.db.queryengine.plan.planner.plan.DistributedQueryPlan;
  import org.apache.iotdb.db.queryengine.plan.planner.plan.LogicalQueryPlan;
  import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
 +import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.RelationalInsertRowNode;
 +import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.RelationalInsertTabletNode;
+ import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.ExchangeNode;
  import org.apache.iotdb.db.queryengine.plan.relational.function.OperatorType;
  import org.apache.iotdb.db.queryengine.plan.relational.metadata.ColumnHandle;
  import org.apache.iotdb.db.queryengine.plan.relational.metadata.ColumnSchema;
@@@ -648,182 -759,13 +777,182 @@@ public class AnalyzerTest 
      assertEquals(3, offsetNode.getCount());
    }
  
 +  @Test
 +  public void sortTest() {
 +    // when TableScan locates multi regions, use default MergeSortNode
 +    sql = "SELECT * FROM table1 ";
 +    context = new MPPQueryContext(sql, queryId, sessionInfo, null, null);
 +    actualAnalysis = analyzeSQL(sql, metadata);
 +    logicalQueryPlan =
 +        new LogicalPlanner(context, metadata, sessionInfo, 
WarningCollector.NOOP)
 +            .plan(actualAnalysis);
 +    rootNode = logicalQueryPlan.getRootNode();
 +  }
 +
 +  private Metadata mockMetadataForInsertion() {
 +    return new TestMatadata() {
 +      @Override
-       public TableSchema validateTableHeaderSchema(
++      public Optional<TableSchema> validateTableHeaderSchema(
 +          String database, TableSchema schema, MPPQueryContext context) {
 +        TableSchema tableSchema = StatementTestUtils.genTableSchema();
 +        assertEquals(tableSchema, schema);
-         return tableSchema;
++        return Optional.of(tableSchema);
 +      }
 +
 +      @Override
 +      public void validateDeviceSchema(
 +          ITableDeviceSchemaValidation schemaValidation, MPPQueryContext 
context) {
 +        assertEquals(sessionInfo.getDatabaseName().get(), 
schemaValidation.getDatabase());
 +        assertEquals(StatementTestUtils.tableName(), 
schemaValidation.getTableName());
 +        Object[] columns = StatementTestUtils.genColumns();
 +        for (int i = 0; i < schemaValidation.getDeviceIdList().size(); i++) {
 +          Object[] objects = schemaValidation.getDeviceIdList().get(i);
 +          assertEquals(objects[0].toString(), StatementTestUtils.tableName());
 +          assertEquals(objects[1].toString(), ((String[]) columns[0])[i]);
 +        }
 +        List<String> attributeColumnNameList = 
schemaValidation.getAttributeColumnNameList();
 +        assertEquals(Collections.singletonList("attr1"), 
attributeColumnNameList);
 +        assertEquals(1, schemaValidation.getAttributeValueList().size());
 +        for (int i = 0; i < schemaValidation.getAttributeValueList().size(); 
i++) {
 +          assertEquals(
 +              ((Object[]) columns[1])[i], ((Object[]) 
schemaValidation.getAttributeValueList().get(0))[i]);
 +        }
 +      }
 +
 +      @Override
 +      public DataPartition getOrCreateDataPartition(
 +          List<DataPartitionQueryParam> dataPartitionQueryParams, String 
userName) {
 +        int seriesSlotNum = StatementTestUtils.TEST_SERIES_SLOT_NUM;
 +        String partitionExecutorName = 
StatementTestUtils.TEST_PARTITION_EXECUTOR;
 +        SeriesPartitionExecutor seriesPartitionExecutor =
 +            SeriesPartitionExecutor.getSeriesPartitionExecutor(
 +                partitionExecutorName, seriesSlotNum);
 +
 +        Map<String, Map<TSeriesPartitionSlot, Map<TTimePartitionSlot, 
List<TRegionReplicaSet>>>>
 +            dataPartitionMap = new HashMap<>();
 +
 +        for (DataPartitionQueryParam dataPartitionQueryParam : 
dataPartitionQueryParams) {
 +          String databaseName = dataPartitionQueryParam.getDatabaseName();
 +          assertEquals(sessionInfo.getDatabaseName().get(), databaseName);
 +
 +          String tableName = 
dataPartitionQueryParam.getDeviceID().getTableName();
 +          assertEquals(StatementTestUtils.tableName(), tableName);
 +
 +          TSeriesPartitionSlot partitionSlot =
 +              seriesPartitionExecutor.getSeriesPartitionSlot(
 +                  dataPartitionQueryParam.getDeviceID());
 +          for (TTimePartitionSlot tTimePartitionSlot :
 +              dataPartitionQueryParam.getTimePartitionSlotList()) {
 +            dataPartitionMap
 +                .computeIfAbsent(databaseName, d -> new HashMap<>())
 +                .computeIfAbsent(partitionSlot, slot -> new HashMap<>())
 +                .computeIfAbsent(tTimePartitionSlot, slot -> new 
ArrayList<>())
 +                .add(
 +                    new TRegionReplicaSet(
 +                        new TConsensusGroupId(
 +                            TConsensusGroupType.DataRegion, 
partitionSlot.slotId),
 +                        Collections.singletonList(
 +                            new TDataNodeLocation(
 +                                partitionSlot.slotId, null, null, null, null, 
null))));
 +          }
 +        }
 +        return new DataPartition(dataPartitionMap, partitionExecutorName, 
seriesSlotNum);
 +      }
 +    };
 +  }
 +
 +  @Test
 +  public void analyzeInsertTablet() {
 +    Metadata mockMetadata = mockMetadataForInsertion();
 +
 +    InsertTabletStatement insertTabletStatement = 
StatementTestUtils.genInsertTabletStatement(true);
 +    context = new MPPQueryContext("", queryId, sessionInfo, null, null);
 +    actualAnalysis =
 +        analyzeStatement(
 +            insertTabletStatement.toRelationalStatement(context),
 +            mockMetadata,
 +            new SqlParser(),
 +            sessionInfo);
 +    assertEquals(1, 
actualAnalysis.getDataPartition().getDataPartitionMap().size());
 +    Map<TSeriesPartitionSlot, Map<TTimePartitionSlot, 
List<TRegionReplicaSet>>> partitionSlotMapMap = 
actualAnalysis.getDataPartition()
 +        
.getDataPartitionMap().get(sessionInfo.getDatabaseName().orElse(null));
 +    assertEquals(3, partitionSlotMapMap.size());
 +
 +    logicalQueryPlan =
 +        new LogicalPlanner(context, mockMetadata, sessionInfo, 
WarningCollector.NOOP)
 +            .plan(actualAnalysis);
 +
 +    RelationalInsertTabletNode insertTabletNode =
 +        (RelationalInsertTabletNode) logicalQueryPlan.getRootNode();
 +
 +    assertEquals(insertTabletNode.getTableName(), 
StatementTestUtils.tableName());
 +    assertEquals(3, insertTabletNode.getRowCount());
 +    Object[] columns = StatementTestUtils.genColumns();
 +    for (int i = 0; i < insertTabletNode.getRowCount(); i++) {
 +      assertEquals(
 +          Factory.DEFAULT_FACTORY.create(
 +              new String[] {StatementTestUtils.tableName(), ((String[]) 
columns[0])[i]}),
 +          insertTabletNode.getDeviceID(i));
 +    }
 +    assertArrayEquals(columns, insertTabletNode.getColumns());
 +    assertArrayEquals(StatementTestUtils.genTimestamps(), 
insertTabletNode.getTimes());
 +
 +    distributionPlanner = new TableDistributionPlanner(actualAnalysis, 
logicalQueryPlan, context);
 +    distributedQueryPlan = distributionPlanner.plan();
 +    assertEquals(3, distributedQueryPlan.getInstances().size());
 +  }
 +
 +  @Test
 +  public void analyzeInsertRow() {
 +    Metadata mockMetadata = mockMetadataForInsertion();
 +
 +    InsertRowStatement insertStatement = 
StatementTestUtils.genInsertRowStatement(true);
 +    context = new MPPQueryContext("", queryId, sessionInfo, null, null);
 +    actualAnalysis =
 +        analyzeStatement(
 +            insertStatement.toRelationalStatement(context),
 +            mockMetadata,
 +            new SqlParser(),
 +            sessionInfo);
 +    assertEquals(1, 
actualAnalysis.getDataPartition().getDataPartitionMap().size());
 +    Map<TSeriesPartitionSlot, Map<TTimePartitionSlot, 
List<TRegionReplicaSet>>> partitionSlotMapMap = 
actualAnalysis.getDataPartition()
 +        
.getDataPartitionMap().get(sessionInfo.getDatabaseName().orElse(null));
 +    assertEquals(1, partitionSlotMapMap.size());
 +
 +    logicalQueryPlan =
 +        new LogicalPlanner(context, mockMetadata, sessionInfo, 
WarningCollector.NOOP)
 +            .plan(actualAnalysis);
 +
 +    RelationalInsertRowNode insertNode =
 +        (RelationalInsertRowNode) logicalQueryPlan.getRootNode();
 +
 +    assertEquals(insertNode.getTableName(), StatementTestUtils.tableName());
 +    Object[] columns = StatementTestUtils.genValues(0);
 +    assertEquals(
 +        Factory.DEFAULT_FACTORY.create(
 +            new String[] {StatementTestUtils.tableName(), ((String) 
columns[0])}),
 +        insertNode.getDeviceID());
 +
 +    assertArrayEquals(columns, insertNode.getValues());
 +    assertEquals(StatementTestUtils.genTimestamps()[0], insertNode.getTime());
 +
 +    distributionPlanner = new TableDistributionPlanner(actualAnalysis, 
logicalQueryPlan, context);
 +    distributedQueryPlan = distributionPlanner.plan();
 +    assertEquals(1, distributedQueryPlan.getInstances().size());
 +  }
 +
    public static Analysis analyzeSQL(String sql, Metadata metadata) {
 +    SqlParser sqlParser = new SqlParser();
 +    Statement statement = sqlParser.createStatement(sql, 
ZoneId.systemDefault());
 +    SessionInfo session =
 +        new SessionInfo(
 +            0, "test", ZoneId.systemDefault(), "testdb", 
IClientSession.SqlDialect.TABLE);
 +    return analyzeStatement(statement, metadata, sqlParser, session);
 +  }
 +
 +  public static Analysis analyzeStatement(
 +      Statement statement, Metadata metadata, SqlParser sqlParser, 
SessionInfo session) {
      try {
 -      SqlParser sqlParser = new SqlParser();
 -      Statement statement = sqlParser.createStatement(sql, 
ZoneId.systemDefault());
 -      SessionInfo session =
 -          new SessionInfo(
 -              0, "test", ZoneId.systemDefault(), "testdb", 
IClientSession.SqlDialect.TABLE);
        StatementAnalyzerFactory statementAnalyzerFactory =
            new StatementAnalyzerFactory(metadata, sqlParser, nopAccessControl);
  
diff --cc iotdb-protocol/thrift-confignode/src/main/thrift/confignode.thrift
index 16c728caf81,16c728caf81..cf5c3575674
--- a/iotdb-protocol/thrift-confignode/src/main/thrift/confignode.thrift
+++ b/iotdb-protocol/thrift-confignode/src/main/thrift/confignode.thrift
@@@ -1075,6 -1075,6 +1075,11 @@@ service IConfigNodeRPCService 
     */
    TSchemaPartitionTableResp getSchemaPartitionTable(TSchemaPartitionReq req)
  
++  /**
++  * Get SchemaPartitionTable by specific database name and series slots.
++  **/
++  TSchemaPartitionTableResp getSchemaPartitionTableWithSlots(map<string, 
list<common.TSeriesPartitionSlot>> dbSlotMap)
++
    /**
     * Get or create SchemaPartitionTable by specific PathPatternTree,
     * the returned SchemaPartitionTable always contains all the 
SeriesPartitionSlots
@@@ -1086,6 -1086,6 +1091,11 @@@
     */
    TSchemaPartitionTableResp 
getOrCreateSchemaPartitionTable(TSchemaPartitionReq req)
  
++  /**
++   * Get or create SchemaPartitionTable by specific database name and series 
slots.
++   **/
++   TSchemaPartitionTableResp 
getOrCreateSchemaPartitionTableWithSlots(map<string, 
list<common.TSeriesPartitionSlot>> dbSlotMap)
++
    // ======================================================
    // Node Management
    // ======================================================

Reply via email to