This is an automated email from the ASF dual-hosted git repository. jiangtian pushed a commit to branch TableModelIngestion in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit ac19d1d238c7d719ee62a4aa95e6e4d9d94c8d37 Merge: 7313ad4850c 180e8fb422b Author: Tian Jiang <[email protected]> AuthorDate: Tue Jul 9 16:57:30 2024 +0800 Merge branch 'ty/TableModelGrammar' into TableModelIngestion # Conflicts: # iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanNodeType.java # iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/fetcher/TableHeaderSchemaValidator.java # iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/LogicalPlanner.java # iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/RelationPlanner.java # iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/AnalyzerTest.java .../iotdb/session/it/IoTDBSessionRelationalIT.java | 2 +- .../it/dual/IoTDBSubscriptionConsumerGroupIT.java | 48 +- .../java/org/apache/iotdb/rpc/TSStatusCode.java | 3 + .../iotdb/confignode/manager/ConfigManager.java | 22 + .../apache/iotdb/confignode/manager/IManager.java | 9 + .../iotdb/confignode/manager/ProcedureManager.java | 183 +- .../PipeConfigPhysicalPlanTSStatusVisitor.java | 16 +- .../manager/schema/ClusterSchemaManager.java | 18 +- .../confignode/persistence/schema/ConfigMTree.java | 38 +- .../impl/schema/DeleteDatabaseProcedure.java | 4 + .../impl/schema/table/AddTableColumnProcedure.java | 13 +- .../state/schema/AddTableColumnState.java | 4 +- .../thrift/ConfigNodeRPCServiceProcessor.java | 8 + .../metadata/DatabaseAlreadySetException.java | 29 +- ...ception.java => DatabaseConflictException.java} | 26 +- .../legacy/IoTDBLegacyPipeReceiverAgent.java | 104 +- .../iotdb/db/protocol/client/ConfigNodeClient.java | 17 + .../iotdb/db/protocol/session/IClientSession.java | 1 + .../impl/DataNodeInternalRPCServiceImpl.java | 7 + .../source/relational/TableScanOperator.java | 4 +- .../plan/analyze/ClusterPartitionFetcher.java | 52 +- .../plan/analyze/LoadTsfileAnalyzer.java | 7 +- .../analyze/cache/partition/PartitionCache.java | 50 +- .../cache/schema/dualkeycache/IDualKeyCache.java | 13 + .../schema/dualkeycache/impl/DualKeyCacheImpl.java | 37 + .../plan/analyze/schema/SchemaValidator.java | 5 +- .../relational/AlterTableAddColumnTask.java | 3 + .../TableModelStatementMemorySourceVisitor.java | 30 +- .../plan/planner/LocalExecutionPlanner.java | 13 +- .../plan/planner/OperatorTreeGenerator.java | 1 + .../plan/planner/plan/node/PlanNode.java | 2 +- .../plan/planner/plan/node/PlanNodeType.java | 1 + .../planner/plan/parameter/SeriesScanOptions.java | 11 + .../plan/relational/metadata/ColumnSchema.java | 1 + .../plan/relational/metadata/Metadata.java | 24 +- .../plan/relational/metadata/MetadataUtil.java | 2 +- .../relational/metadata/TableMetadataImpl.java | 8 +- .../metadata/fetcher/TableDeviceSchemaFetcher.java | 114 +- .../fetcher/TableDeviceSchemaValidator.java | 122 +- .../fetcher/TableHeaderSchemaValidator.java | 118 +- .../fetcher/cache/TableDeviceCacheEntry.java | 25 +- .../metadata/fetcher/cache/TableDeviceId.java | 12 +- .../fetcher/cache/TableDeviceSchemaCache.java | 11 +- .../relational/metadata/fetcher/cache/TableId.java | 16 +- .../plan/relational/planner/LogicalPlanner.java | 35 +- .../plan/relational/planner/RelationPlanner.java | 22 +- .../plan/relational/planner/Symbol.java | 5 + .../planner/distribute/AddExchangeNodes.java | 94 + .../distribute/DistributedPlanGenerator.java | 495 +++++ .../planner/distribute/ExchangeNodeGenerator.java | 219 -- .../planner/distribute/SimplePlanRewriter.java | 44 - .../distribute/TableDistributionPlanner.java | 47 +- .../TableModelTypeProviderExtractor.java | 7 + .../plan/relational/planner/node/CollectNode.java | 20 + .../planner/node/CreateTableDeviceNode.java | 50 +- .../plan/relational/planner/node/FilterNode.java | 21 + .../plan/relational/planner/node/LimitNode.java | 22 +- .../relational/planner/node/MergeSortNode.java | 21 + .../plan/relational/planner/node/OffsetNode.java | 20 + .../plan/relational/planner/node/OutputNode.java | 43 +- .../plan/relational/planner/node/ProjectNode.java | 20 + .../plan/relational/planner/node/SortNode.java | 44 +- .../relational/planner/node/StreamSortNode.java | 43 +- .../relational/planner/node/TableScanNode.java | 68 +- .../plan/relational/planner/node/TopKNode.java | 59 +- .../planner/optimizations/PruneUnUsedColumns.java | 16 +- .../optimizations/PushPredicateIntoTableScan.java | 95 +- .../RemoveRedundantIdentityProjections.java | 2 +- .../planner/optimizations/SimplifyExpressions.java | 2 +- ...lPlanOptimizer.java => TablePlanOptimizer.java} | 2 +- .../attribute/DeviceAttributeStore.java | 28 +- .../mtree/impl/mem/MTreeBelowSGMemoryImpl.java | 4 +- .../read/resp/info/impl/ShowDevicesResult.java | 3 +- .../db/schemaengine/table/DataNodeTableCache.java | 13 + .../iotdb/db/schemaengine/table/ITableCache.java | 5 + .../org/apache/iotdb/db/utils/CommonUtils.java | 7 + .../plan/relational/analyzer/AnalyzerTest.java | 189 +- .../analyzer/MockTableModelDataPartition.java | 166 ++ .../relational/analyzer/MockTablePartition.java | 174 -- .../plan/relational/analyzer/SortTest.java | 2251 ++++++++++++++++++++ .../plan/relational/analyzer/TestMatadata.java | 35 +- .../iotdb/commons/partition/DataPartition.java | 17 +- .../apache/iotdb/commons/schema/table/TsTable.java | 90 +- .../table/column/TsTableColumnSchemaUtil.java | 2 +- .../src/main/thrift/confignode.thrift | 10 + 85 files changed, 4605 insertions(+), 1139 deletions(-) diff --cc integration-test/src/test/java/org/apache/iotdb/session/it/IoTDBSessionRelationalIT.java index a7439f8b1af,00000000000..e6ad59da345 mode 100644,000000..100644 --- a/integration-test/src/test/java/org/apache/iotdb/session/it/IoTDBSessionRelationalIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/session/it/IoTDBSessionRelationalIT.java @@@ -1,174 -1,0 +1,174 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iotdb.session.it; + +import org.apache.iotdb.isession.ISession; +import org.apache.iotdb.isession.SessionDataSet; +import org.apache.iotdb.it.env.EnvFactory; +import org.apache.iotdb.it.framework.IoTDBTestRunner; +import org.apache.iotdb.itbase.category.ClusterIT; +import org.apache.iotdb.itbase.category.LocalStandaloneIT; +import org.apache.iotdb.rpc.IoTDBConnectionException; +import org.apache.iotdb.rpc.StatementExecutionException; + +import org.apache.iotdb.session.Session; +import org.apache.tsfile.enums.TSDataType; +import org.apache.tsfile.read.common.RowRecord; +import org.apache.tsfile.write.record.Tablet; +import org.apache.tsfile.write.record.Tablet.ColumnType; +import org.apache.tsfile.write.schema.IMeasurementSchema; +import org.apache.tsfile.write.schema.MeasurementSchema; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.runner.RunWith; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +import static org.apache.iotdb.itbase.env.BaseEnv.TABLE_SQL_DIALECT; +import static org.junit.Assert.assertEquals; + +@RunWith(IoTDBTestRunner.class) +public class IoTDBSessionRelationalIT { + + private static Logger LOGGER = LoggerFactory.getLogger(IoTDBSessionRelationalIT.class); + + @Before + public void setUp() throws Exception { + EnvFactory.getEnv().initClusterEnvironment(); + try (ISession session = EnvFactory.getEnv().getSessionConnection(TABLE_SQL_DIALECT)) { + session.executeNonQueryStatement("CREATE DATABASE db1"); + } + } + + @After + public void tearDown() throws Exception { + try (ISession session = EnvFactory.getEnv().getSessionConnection(TABLE_SQL_DIALECT)) { + session.executeNonQueryStatement("DROP DATABASE db1"); + } + EnvFactory.getEnv().cleanClusterEnvironment(); + } + + public static void main(String[] args) + throws IoTDBConnectionException, StatementExecutionException { + try (ISession session = + new Session.Builder().host("127.0.0.1").port(6667).sqlDialect(TABLE_SQL_DIALECT).build()) { + session.open(); + try { + session.executeNonQueryStatement("DROP DATABASE db1"); + } catch (Exception ignored) { + + } + session.executeNonQueryStatement("CREATE DATABASE db1"); - session.executeNonQueryStatement("USE db1"); ++ session.executeNonQueryStatement("USE \"db1\""); + session.executeNonQueryStatement( + "CREATE TABLE table1 (id1 string id, attr1 string attribute, " + + "m1 double " + + "measurement)"); + + List<IMeasurementSchema> schemaList = new ArrayList<>(); + schemaList.add(new MeasurementSchema("id1", TSDataType.STRING)); + schemaList.add(new MeasurementSchema("attr1", TSDataType.STRING)); + schemaList.add(new MeasurementSchema("m1", TSDataType.DOUBLE)); + final List<ColumnType> columnTypes = + Arrays.asList(ColumnType.ID, ColumnType.ATTRIBUTE, ColumnType.MEASUREMENT); + + Tablet tablet = new Tablet("table1", schemaList, columnTypes, 10); + + long timestamp = System.currentTimeMillis(); + + for (long row = 0; row < 15; row++) { + int rowIndex = tablet.rowSize++; + tablet.addTimestamp(rowIndex, timestamp + row); + tablet.addValue("id1", rowIndex, "id:" + row); + tablet.addValue("attr1", rowIndex, "attr:" + row); + tablet.addValue("m1", rowIndex, row * 1.0); + if (tablet.rowSize == tablet.getMaxRowNumber()) { + session.insertRelationalTablet(tablet, true); + tablet.reset(); + } + timestamp++; + } + + if (tablet.rowSize != 0) { + session.insertRelationalTablet(tablet); + tablet.reset(); + } + + SessionDataSet dataSet = session.executeQueryStatement("select count(*) from table1"); + while (dataSet.hasNext()) { + RowRecord rowRecord = dataSet.next(); + assertEquals(15L, rowRecord.getFields().get(0).getLongV()); + } + } + } + + @Test + @Category({LocalStandaloneIT.class, ClusterIT.class}) + public void insertRelationalTabletTest() + throws IoTDBConnectionException, StatementExecutionException { + try (ISession session = EnvFactory.getEnv().getSessionConnection(TABLE_SQL_DIALECT)) { + session.executeNonQueryStatement("USE db1"); + session.executeNonQueryStatement( + "CREATE TABLE table1 (id1 string id, attr1 string attribute, " + + "m1 double " + + "measurement)"); + + List<IMeasurementSchema> schemaList = new ArrayList<>(); + schemaList.add(new MeasurementSchema("id1", TSDataType.STRING)); + schemaList.add(new MeasurementSchema("attr1", TSDataType.STRING)); + schemaList.add(new MeasurementSchema("m1", TSDataType.DOUBLE)); + final List<ColumnType> columnTypes = + Arrays.asList(ColumnType.ID, ColumnType.ATTRIBUTE, ColumnType.MEASUREMENT); + + Tablet tablet = new Tablet("table1", schemaList, columnTypes, 10); + + long timestamp = System.currentTimeMillis(); + + for (long row = 0; row < 15; row++) { + int rowIndex = tablet.rowSize++; + tablet.addTimestamp(rowIndex, timestamp + row); + tablet.addValue("id1", rowIndex, "id:" + row); + tablet.addValue("attr1", rowIndex, "attr:" + row); + tablet.addValue("m1", rowIndex, row * 1.0); + if (tablet.rowSize == tablet.getMaxRowNumber()) { + session.insertRelationalTablet(tablet, true); + tablet.reset(); + } + timestamp++; + } + + if (tablet.rowSize != 0) { + session.insertRelationalTablet(tablet); + tablet.reset(); + } + + SessionDataSet dataSet = session.executeQueryStatement("select count(*) from table1"); + while (dataSet.hasNext()) { + RowRecord rowRecord = dataSet.next(); + assertEquals(15L, rowRecord.getFields().get(0).getLongV()); + } + } + } +} diff --cc iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java index fddc92e40e9,fddc92e40e9..b3a91e4aa6a --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java @@@ -754,6 -754,6 +754,28 @@@ public class ConfigManager implements I return resp; } ++ @Override ++ public TSchemaPartitionTableResp getSchemaPartition( ++ Map<String, List<TSeriesPartitionSlot>> dbSlotMap) { ++ // Construct empty response ++ TSchemaPartitionTableResp resp = new TSchemaPartitionTableResp(); ++ // Return empty resp if the partitionSlotsMap is empty ++ if (dbSlotMap.isEmpty()) { ++ return resp.setStatus(StatusUtils.OK).setSchemaPartitionTable(new HashMap<>()); ++ } ++ ++ GetSchemaPartitionPlan getSchemaPartitionPlan = ++ new GetSchemaPartitionPlan( ++ dbSlotMap.entrySet().stream() ++ .collect(Collectors.toMap(Map.Entry::getKey, e -> new ArrayList<>(e.getValue())))); ++ SchemaPartitionResp queryResult = partitionManager.getSchemaPartition(getSchemaPartitionPlan); ++ resp = queryResult.convertToRpcSchemaPartitionTableResp(); ++ ++ LOGGER.debug("GetSchemaPartition receive paths: {}, return: {}", dbSlotMap, resp); ++ ++ return resp; ++ } ++ @Override public TSchemaPartitionTableResp getOrCreateSchemaPartition(PathPatternTree patternTree) { // Construct empty response diff --cc iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java index 98695d1eb7b,98695d1eb7b..c280f0fa2a8 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java @@@ -19,10 -19,10 +19,12 @@@ package org.apache.iotdb.confignode.manager; ++import java.util.Map; import org.apache.iotdb.common.rpc.thrift.TConfigNodeLocation; import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation; import org.apache.iotdb.common.rpc.thrift.TFlushReq; import org.apache.iotdb.common.rpc.thrift.TSStatus; ++import org.apache.iotdb.common.rpc.thrift.TSeriesPartitionSlot; import org.apache.iotdb.common.rpc.thrift.TSetConfigurationReq; import org.apache.iotdb.common.rpc.thrift.TSetSpaceQuotaReq; import org.apache.iotdb.common.rpc.thrift.TShowConfigurationResp; @@@ -355,6 -355,6 +357,13 @@@ public interface IManager */ TSchemaPartitionTableResp getSchemaPartition(PathPatternTree patternTree); ++ /** ++ * Get SchemaPartition with <databaseName, seriesSlot>. ++ * ++ * @return TSchemaPartitionResp ++ */ ++ TSchemaPartitionTableResp getSchemaPartition(Map<String, List<TSeriesPartitionSlot>> dbSlotMap); ++ /** * Get or create SchemaPartition. * diff --cc iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java index 28978ab6648,28978ab6648..df2445c67cf --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java @@@ -19,11 -19,11 +19,13 @@@ package org.apache.iotdb.confignode.service.thrift; ++import java.util.Map; import org.apache.iotdb.common.rpc.thrift.TConfigNodeLocation; import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation; import org.apache.iotdb.common.rpc.thrift.TFlushReq; import org.apache.iotdb.common.rpc.thrift.TNodeLocations; import org.apache.iotdb.common.rpc.thrift.TSStatus; ++import org.apache.iotdb.common.rpc.thrift.TSeriesPartitionSlot; import org.apache.iotdb.common.rpc.thrift.TSetConfigurationReq; import org.apache.iotdb.common.rpc.thrift.TSetSpaceQuotaReq; import org.apache.iotdb.common.rpc.thrift.TSetTTLReq; @@@ -530,6 -530,6 +532,12 @@@ public class ConfigNodeRPCServiceProces return configManager.getOrCreateSchemaPartition(patternTree); } ++ @Override ++ public TSchemaPartitionTableResp getOrCreateSchemaPartitionTableWithSlots( ++ Map<String, TSeriesPartitionSlot> dbSlotMap) throws TException { ++ return configManager.geto; ++ } ++ @Override public TSchemaNodeManagementResp getSchemaNodeManagementPartition(TSchemaNodeManagementReq req) { PathPatternTree patternTree = diff --cc iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/ConfigNodeClient.java index 0b5ec89dacf,0b5ec89dacf..de4b063fe79 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/ConfigNodeClient.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/ConfigNodeClient.java @@@ -19,12 -19,12 +19,14 @@@ package org.apache.iotdb.db.protocol.client; ++import java.util.Map; import org.apache.iotdb.common.rpc.thrift.TConfigNodeLocation; import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation; import org.apache.iotdb.common.rpc.thrift.TEndPoint; import org.apache.iotdb.common.rpc.thrift.TFlushReq; import org.apache.iotdb.common.rpc.thrift.TNodeLocations; import org.apache.iotdb.common.rpc.thrift.TSStatus; ++import org.apache.iotdb.common.rpc.thrift.TSeriesPartitionSlot; import org.apache.iotdb.common.rpc.thrift.TSetConfigurationReq; import org.apache.iotdb.common.rpc.thrift.TSetSpaceQuotaReq; import org.apache.iotdb.common.rpc.thrift.TSetTTLReq; @@@ -527,6 -527,6 +529,13 @@@ public class ConfigNodeClient implement () -> client.getSchemaPartitionTable(req), resp -> !updateConfigNodeLeader(resp.status)); } ++ @Override ++ public TSchemaPartitionTableResp getSchemaPartitionTableWithSlots( ++ Map<String, List<TSeriesPartitionSlot>> dbSlotMap) throws TException { ++ return executeRemoteCallWithRetry( ++ () -> client.getSchemaPartitionTableWithSlots(dbSlotMap), resp -> !updateConfigNodeLeader(resp.status)); ++ } ++ @Override public TSchemaPartitionTableResp getOrCreateSchemaPartitionTable(TSchemaPartitionReq req) throws TException { @@@ -535,6 -535,6 +544,14 @@@ resp -> !updateConfigNodeLeader(resp.status)); } ++ @Override ++ public TSchemaPartitionTableResp getOrCreateSchemaPartitionTableWithSlots( ++ Map<String, List<TSeriesPartitionSlot>> dbSlotMap) throws TException { ++ return executeRemoteCallWithRetry( ++ () -> client.getOrCreateSchemaPartitionTableWithSlots(dbSlotMap), ++ resp -> !updateConfigNodeLeader(resp.status)); ++ } ++ @Override public TSchemaNodeManagementResp getSchemaNodeManagementPartition(TSchemaNodeManagementReq req) throws TException { diff --cc iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/session/IClientSession.java index edba5195ce8,edba5195ce8..d50ab5e61d3 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/session/IClientSession.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/session/IClientSession.java @@@ -20,6 -20,6 +20,7 @@@ package org.apache.iotdb.db.protocol.session; import org.apache.iotdb.commons.conf.IoTDBConstant.ClientVersion; ++import org.apache.iotdb.db.utils.CommonUtils; import org.apache.iotdb.service.rpc.thrift.TSConnectionInfo; import org.apache.iotdb.service.rpc.thrift.TSConnectionType; diff --cc iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/ClusterPartitionFetcher.java index 2f1e93d4598,1f4f63f6d2d..f3e634b054f --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/ClusterPartitionFetcher.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/ClusterPartitionFetcher.java @@@ -97,23 -98,23 +98,44 @@@ public class ClusterPartitionFetcher im @Override public SchemaPartition getSchemaPartition(PathPatternTree patternTree) { -- return getOrCreateSchemaPartition(patternTree, false, null); ++ try (ConfigNodeClient client = ++ configNodeClientManager.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) { ++ patternTree.constructTree(); ++ List<IDeviceID> deviceIDs = patternTree.getAllDevicePatterns(); ++ Map<String, List<IDeviceID>> storageGroupToDeviceMap = ++ partitionCache.getDatabaseToDevice(deviceIDs, true, false, null); ++ SchemaPartition schemaPartition = partitionCache.getSchemaPartition(storageGroupToDeviceMap); ++ if (null == schemaPartition) { ++ TSchemaPartitionTableResp schemaPartitionTableResp = ++ client.getSchemaPartitionTable(constructSchemaPartitionReq(patternTree)); ++ if (schemaPartitionTableResp.getStatus().getCode() ++ == TSStatusCode.SUCCESS_STATUS.getStatusCode()) { ++ schemaPartition = parseSchemaPartitionTableResp(schemaPartitionTableResp); ++ partitionCache.updateSchemaPartitionCache( ++ schemaPartitionTableResp.getSchemaPartitionTable()); ++ } else { ++ throw new RuntimeException( ++ new IoTDBException( ++ schemaPartitionTableResp.getStatus().getMessage(), ++ schemaPartitionTableResp.getStatus().getCode())); ++ } ++ } ++ return schemaPartition; ++ } catch (ClientManagerException | TException e) { ++ throw new StatementAnalyzeException( ++ "An error occurred when executing getSchemaPartition():" + e.getMessage()); ++ } } @Override public SchemaPartition getOrCreateSchemaPartition(PathPatternTree patternTree, String userName) { -- return getOrCreateSchemaPartition(patternTree, true, userName); -- } -- -- private SchemaPartition getOrCreateSchemaPartition( -- PathPatternTree patternTree, boolean isAutoCreate, String userName) { try (ConfigNodeClient client = configNodeClientManager.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) { patternTree.constructTree(); List<IDeviceID> deviceIDs = patternTree.getAllDevicePatterns(); -- Map<String, List<IDeviceID>> databaseToDevice = -- partitionCache.getDatabaseToDevice(deviceIDs, true, isAutoCreate, userName); -- SchemaPartition schemaPartition = partitionCache.getSchemaPartition(databaseToDevice); ++ Map<String, List<IDeviceID>> storageGroupToDeviceMap = ++ partitionCache.getDatabaseToDevice(deviceIDs, true, true, userName); ++ SchemaPartition schemaPartition = partitionCache.getSchemaPartition(storageGroupToDeviceMap); if (null == schemaPartition) { TSchemaPartitionTableResp schemaPartitionTableResp = client.getOrCreateSchemaPartitionTable(constructSchemaPartitionReq(patternTree)); @@@ -290,17 -291,12 +312,15 @@@ configNodeClientManager.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) { partitionCache.checkAndAutoCreateDatabase(database, isAutoCreate, userName); SchemaPartition schemaPartition = - partitionCache.getSchemaPartition( - new HashMap<String, List<IDeviceID>>() { - { - put(database, deviceIDs); - } - }); + partitionCache.getSchemaPartition(Collections.singletonMap(database, deviceIDs)); if (null == schemaPartition) { PathPatternTree tree = new PathPatternTree(); -- tree.appendPathPattern(new PartialPath(database + "." + MULTI_LEVEL_PATH_WILDCARD)); ++// tree.appendPathPattern(new PartialPath(database + "." + MULTI_LEVEL_PATH_WILDCARD)); ++ for (IDeviceID deviceID : deviceIDs) { ++ tree.appendPathPattern(new PartialPath(deviceID.toString())); ++ } TSchemaPartitionTableResp schemaPartitionTableResp = - client.getSchemaPartitionTable(constructSchemaPartitionReq(tree)); + client.getOrCreateSchemaPartitionTable(constructSchemaPartitionReq(tree)); if (schemaPartitionTableResp.getStatus().getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) { schemaPartition = parseSchemaPartitionTableResp(schemaPartitionTableResp); diff --cc iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/schema/SchemaValidator.java index d1c1aac54d4,7bbdd5cefc6..63f9d3afd9c --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/schema/SchemaValidator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/schema/SchemaValidator.java @@@ -62,21 -55,6 +62,24 @@@ public class SchemaValidator } } + public static void validate( + Metadata metadata, WrappedInsertStatement insertStatement, MPPQueryContext context) { + try { + String databaseName = context.getSession().getDatabaseName().orElse(null); + final TableSchema incomingSchema = insertStatement.getTableSchema(); + final TableSchema realSchema = - metadata.validateTableHeaderSchema(databaseName, incomingSchema, context); ++ metadata.validateTableHeaderSchema(databaseName, incomingSchema, context).orElse(null); ++ if (realSchema == null) { ++ throw new SemanticException("Schema validation failed, table cannot be created: " + incomingSchema); ++ } + insertStatement.validate(realSchema); + metadata.validateDeviceSchema(insertStatement, context); + insertStatement.updateAfterSchemaValidation(context); + } catch (QueryProcessException e) { + throw new SemanticException(e.getMessage()); + } + } + public static ISchemaTree validate( ISchemaFetcher schemaFetcher, List<PartialPath> devicePaths, diff --cc iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LocalExecutionPlanner.java index 3b74f1b4a08,3b74f1b4a08..979a0ce4c89 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LocalExecutionPlanner.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LocalExecutionPlanner.java @@@ -136,7 -136,7 +136,18 @@@ public class LocalExecutionPlanner LocalExecutionPlanContext context = new LocalExecutionPlanContext(instanceContext, schemaRegion); -- Operator root = plan.accept(new OperatorTreeGenerator(), context); ++ Operator root; ++ IClientSession.SqlDialect sqlDialect = instanceContext.getSessionInfo().getSqlDialect(); ++ switch (sqlDialect) { ++ case TREE: ++ root = plan.accept(new OperatorTreeGenerator(), context); ++ break; ++ case TABLE: ++ root = plan.accept(new TableOperatorGenerator(metadata), context); ++ break; ++ default: ++ throw new IllegalArgumentException(String.format("Unknown sql dialect: %s", sqlDialect)); ++ } PipelineMemoryEstimator memoryEstimator = context.constructPipelineMemoryEstimator(root, null, plan, -1); diff --cc iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/OperatorTreeGenerator.java index 71dc05861d4,71dc05861d4..c937e3e9794 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/OperatorTreeGenerator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/OperatorTreeGenerator.java @@@ -186,6 -186,6 +186,7 @@@ import org.apache.iotdb.db.queryengine. import org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.read.SchemaQueryOrderByHeatNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.read.SchemaQueryScanNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.read.SeriesSchemaFetchScanNode; ++import org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.read.TableDeviceFetchNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.read.TimeSeriesCountNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.read.TimeSeriesSchemaScanNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.ActiveRegionScanMergeNode; diff --cc iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanNodeType.java index 7536c309bc6,91ab840732a..a9ece13473d --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanNodeType.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanNodeType.java @@@ -241,8 -240,7 +241,9 @@@ public enum PlanNodeType TABLE_MERGESORT_NODE((short) 1007), TABLE_TOPK_NODE((short) 1008), TABLE_COLLECT_NODE((short) 1009), - TABLE_STREAM_SORT_NODE((short) 1010); ++ TABLE_STREAM_SORT_NODE((short) 1010), + + RELATIONAL_INSERT_TABLET((short) 2000); public static final int BYTES = Short.BYTES; diff --cc iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/fetcher/TableHeaderSchemaValidator.java index cef7ad3a799,a941dc2a272..f0fca77afcc --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/fetcher/TableHeaderSchemaValidator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/fetcher/TableHeaderSchemaValidator.java @@@ -73,16 -73,19 +74,20 @@@ public class TableHeaderSchemaValidato return TableHeaderSchemaValidatorHolder.INSTANCE; } - // This method return all the existing column schemas in the target table. - // When table or column is missing, this method will execute auto creation. - // When using SQL, the columnSchemaList could be null and there won't be any validation. - // All input column schemas will be validated and auto created when necessary. - // When the input dataType or category of one column is null, the column cannot be auto created. - public TableSchema validateTableHeaderSchema( + public Optional<TableSchema> validateTableHeaderSchema( String database, TableSchema tableSchema, MPPQueryContext context) { + // The schema cache R/W and fetch operation must be locked together thus the cache clean + // operation executed by delete timeseries will be effective. + DataNodeSchemaLockManager.getInstance().takeReadLock(SchemaLockType.VALIDATE_VS_DELETION); + context.addAcquiredLockNum(SchemaLockType.VALIDATE_VS_DELETION); + List<ColumnSchema> inputColumnList = tableSchema.getColumns(); + if (inputColumnList == null || inputColumnList.isEmpty()) { + throw new IllegalArgumentException( + "Column List in TableSchema should never be null or empty."); + } TsTable table = DataNodeTableCache.getInstance().getTable(database, tableSchema.getTableName()); + LOGGER.info("Get TsTable from cache: {}", table); List<ColumnSchema> missingColumnList = new ArrayList<>(); List<ColumnSchema> resultColumnList = new ArrayList<>(); diff --cc iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/LogicalPlanner.java index 171159f57b6,ac17c3ff500..1aa2f80e0d0 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/LogicalPlanner.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/LogicalPlanner.java @@@ -90,14 -88,25 +90,25 @@@ public class LogicalPlanner new RemoveRedundantIdentityProjections()); } + public LogicalPlanner( + MPPQueryContext context, + Metadata metadata, + SessionInfo sessionInfo, + List<TablePlanOptimizer> tablePlanOptimizers, + WarningCollector warningCollector) { + this.context = context; + this.metadata = metadata; + this.sessionInfo = requireNonNull(sessionInfo, "session is null"); + this.warningCollector = requireNonNull(warningCollector, "warningCollector is null"); + this.tablePlanOptimizers = tablePlanOptimizers; + } + public LogicalQueryPlan plan(Analysis analysis) { PlanNode planNode = planStatement(analysis, analysis.getStatement()); - - tablePlanOptimizers.forEach( - optimizer -> optimizer.optimize(planNode, analysis, metadata, sessionInfo, context)); -- + if (analysis.getStatement() instanceof Query) { - relationalPlanOptimizers.forEach( ++ tablePlanOptimizers.forEach( + optimizer -> optimizer.optimize(planNode, analysis, metadata, sessionInfo, context)); + } - return new LogicalQueryPlan(context, planNode); } diff --cc iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/RelationPlanner.java index da60edf80f7,d85fcc9056c..ac1ecdd2901 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/RelationPlanner.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/RelationPlanner.java @@@ -48,7 -43,7 +49,8 @@@ import com.google.common.collect.Immuta import com.google.common.collect.ImmutableMap; import java.util.Collection; +import java.util.Collections; + import java.util.HashMap; import java.util.List; import java.util.Map; diff --cc iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/DistributedPlanGenerator.java index 00000000000,0bf5bb32a1b..65041da0c69 mode 000000,100644..100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/DistributedPlanGenerator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/DistributedPlanGenerator.java @@@ -1,0 -1,493 +1,495 @@@ + /* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.iotdb.db.queryengine.plan.relational.planner.distribute; + + import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet; + import org.apache.iotdb.commons.schema.table.column.TsTableColumnCategory; + import org.apache.iotdb.db.queryengine.common.MPPQueryContext; + import org.apache.iotdb.db.queryengine.common.QueryId; + import org.apache.iotdb.db.queryengine.plan.planner.distribution.NodeDistribution; + import org.apache.iotdb.db.queryengine.plan.planner.distribution.NodeDistributionType; + import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode; + import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId; + import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanVisitor; + import org.apache.iotdb.db.queryengine.plan.planner.plan.node.WritePlanNode; + import org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.read.AbstractSchemaMergeNode; + import org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.read.SchemaQueryMergeNode; + import org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.read.TableDeviceSourceNode; + import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.ExchangeNode; + import org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.SourceNode; + import org.apache.iotdb.db.queryengine.plan.relational.analyzer.Analysis; + import org.apache.iotdb.db.queryengine.plan.relational.metadata.DeviceEntry; + import org.apache.iotdb.db.queryengine.plan.relational.planner.OrderingScheme; + import org.apache.iotdb.db.queryengine.plan.relational.planner.SortOrder; + import org.apache.iotdb.db.queryengine.plan.relational.planner.Symbol; + import org.apache.iotdb.db.queryengine.plan.relational.planner.node.CollectNode; + import org.apache.iotdb.db.queryengine.plan.relational.planner.node.FilterNode; + import org.apache.iotdb.db.queryengine.plan.relational.planner.node.LimitNode; + import org.apache.iotdb.db.queryengine.plan.relational.planner.node.MergeSortNode; + import org.apache.iotdb.db.queryengine.plan.relational.planner.node.OffsetNode; + import org.apache.iotdb.db.queryengine.plan.relational.planner.node.OutputNode; + import org.apache.iotdb.db.queryengine.plan.relational.planner.node.ProjectNode; + import org.apache.iotdb.db.queryengine.plan.relational.planner.node.SortNode; + import org.apache.iotdb.db.queryengine.plan.relational.planner.node.TableScanNode; + import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Expression; + import org.apache.iotdb.db.queryengine.plan.statement.component.Ordering; + + import java.util.ArrayList; + import java.util.Collections; + import java.util.Comparator; + import java.util.HashMap; + import java.util.HashSet; + import java.util.List; + import java.util.Map; + import java.util.Set; + import java.util.function.Function; + import java.util.stream.Collectors; + import java.util.stream.IntStream; ++import org.apache.iotdb.db.utils.CommonUtils; + + import static com.google.common.collect.ImmutableList.toImmutableList; + import static org.apache.iotdb.db.queryengine.plan.relational.planner.optimizations.PushPredicateIntoTableScan.containsDiffFunction; + import static org.apache.iotdb.db.utils.constant.TestConstant.TIMESTAMP_STR; + + /** This class is used to generate distributed plan for table model. */ + public class DistributedPlanGenerator + extends PlanVisitor<List<PlanNode>, DistributedPlanGenerator.PlanContext> { + private final QueryId queryId; + private final Analysis analysis; + Map<PlanNodeId, OrderingScheme> nodeOrderingMap = new HashMap<>(); + + public DistributedPlanGenerator(MPPQueryContext queryContext, Analysis analysis) { + this.queryId = queryContext.getQueryId(); + this.analysis = analysis; + } + + public List<PlanNode> genResult(PlanNode node, PlanContext context) { + return node.accept(this, context); + } + + @Override + public List<PlanNode> visitPlan(PlanNode node, DistributedPlanGenerator.PlanContext context) { + if (node instanceof WritePlanNode) { + return Collections.singletonList(node); + } + + List<List<PlanNode>> children = + node.getChildren().stream() + .map(child -> child.accept(this, context)) + .collect(toImmutableList()); + + PlanNode newNode = node.clone(); + for (List<PlanNode> planNodes : children) { + planNodes.forEach(newNode::addChild); + } + return Collections.singletonList(newNode); + } + + @Override + public List<PlanNode> visitOutput(OutputNode node, PlanContext context) { + List<PlanNode> childrenNodes = node.getChild().accept(this, context); + OrderingScheme childOrdering = nodeOrderingMap.get(childrenNodes.get(0).getPlanNodeId()); + if (childOrdering != null) { + nodeOrderingMap.put(node.getPlanNodeId(), childOrdering); + } + + if (childrenNodes.size() == 1) { + node.setChild(childrenNodes.get(0)); + return Collections.singletonList(node); + } + + node.setChild(mergeChildrenViaCollectOrMergeSort(childOrdering, childrenNodes)); + return Collections.singletonList(node); + } + + @Override + public List<PlanNode> visitLimit(LimitNode node, PlanContext context) { + List<PlanNode> childrenNodes = node.getChild().accept(this, context); + OrderingScheme childOrdering = nodeOrderingMap.get(childrenNodes.get(0).getPlanNodeId()); + if (childOrdering != null) { + nodeOrderingMap.put(node.getPlanNodeId(), childOrdering); + } + + if (childrenNodes.size() == 1) { + node.setChild(childrenNodes.get(0)); + return Collections.singletonList(node); + } + + // push down LimitNode in distributed plan optimize rule + node.setChild(mergeChildrenViaCollectOrMergeSort(childOrdering, childrenNodes)); + return Collections.singletonList(node); + } + + @Override + public List<PlanNode> visitOffset(OffsetNode node, PlanContext context) { + List<PlanNode> childrenNodes = node.getChild().accept(this, context); + OrderingScheme childOrdering = nodeOrderingMap.get(childrenNodes.get(0).getPlanNodeId()); + if (childOrdering != null) { + nodeOrderingMap.put(node.getPlanNodeId(), childOrdering); + } + + if (childrenNodes.size() == 1) { + node.setChild(childrenNodes.get(0)); + return Collections.singletonList(node); + } + + node.setChild(mergeChildrenViaCollectOrMergeSort(childOrdering, childrenNodes)); + return Collections.singletonList(node); + } + + @Override + public List<PlanNode> visitProject(ProjectNode node, PlanContext context) { + List<PlanNode> childrenNodes = node.getChild().accept(this, context); + OrderingScheme childOrdering = nodeOrderingMap.get(childrenNodes.get(0).getPlanNodeId()); + if (childOrdering != null) { + nodeOrderingMap.put(node.getPlanNodeId(), childOrdering); + } + + if (childrenNodes.size() == 1) { + node.setChild(childrenNodes.get(0)); + return Collections.singletonList(node); + } + + for (Expression expression : node.getAssignments().getMap().values()) { + if (containsDiffFunction(expression)) { + node.setChild(mergeChildrenViaCollectOrMergeSort(childOrdering, childrenNodes)); + return Collections.singletonList(node); + } + } + + List<PlanNode> resultNodeList = new ArrayList<>(); + for (int i = 0; i < childrenNodes.size(); i++) { + PlanNode child = childrenNodes.get(i); + ProjectNode subProjectNode = + new ProjectNode(queryId.genPlanNodeId(), child, node.getAssignments()); + resultNodeList.add(subProjectNode); + if (i == 0) { + nodeOrderingMap.put(subProjectNode.getPlanNodeId(), childOrdering); + } + } + return resultNodeList; + } + + @Override + public List<PlanNode> visitSort(SortNode node, PlanContext context) { + context.expectedOrderingScheme = node.getOrderingScheme(); + context.hasSortNode = true; + nodeOrderingMap.put(node.getPlanNodeId(), node.getOrderingScheme()); + + List<PlanNode> childrenNodes = node.getChild().accept(this, context); + if (childrenNodes.size() == 1) { + node.setChild(childrenNodes.get(0)); + return Collections.singletonList(node); + } + + // may have ProjectNode above SortNode later, so use MergeSortNode but not return SortNode list + MergeSortNode mergeSortNode = + new MergeSortNode( + queryId.genPlanNodeId(), node.getOrderingScheme(), node.getOutputSymbols()); + for (PlanNode child : childrenNodes) { + SortNode subSortNode = + new SortNode(queryId.genPlanNodeId(), child, node.getOrderingScheme(), false); + mergeSortNode.addChild(subSortNode); + } + nodeOrderingMap.put(mergeSortNode.getPlanNodeId(), mergeSortNode.getOrderingScheme()); + + return Collections.singletonList(mergeSortNode); + } + + @Override + public List<PlanNode> visitFilter(FilterNode node, PlanContext context) { + List<PlanNode> childrenNodes = node.getChild().accept(this, context); + OrderingScheme childOrdering = nodeOrderingMap.get(childrenNodes.get(0).getPlanNodeId()); + if (childOrdering != null) { + nodeOrderingMap.put(node.getPlanNodeId(), childOrdering); + } + + if (childrenNodes.size() == 1) { + node.setChild(childrenNodes.get(0)); + return Collections.singletonList(node); + } + + if (containsDiffFunction(node.getPredicate())) { + node.setChild(mergeChildrenViaCollectOrMergeSort(childOrdering, childrenNodes)); + return Collections.singletonList(node); + } + + List<PlanNode> resultNodeList = new ArrayList<>(); + for (int i = 0; i < childrenNodes.size(); i++) { + PlanNode child = childrenNodes.get(i); + FilterNode subFilterNode = + new FilterNode(queryId.genPlanNodeId(), child, node.getPredicate()); + resultNodeList.add(subFilterNode); + if (i == 0) { + nodeOrderingMap.put(subFilterNode.getPlanNodeId(), childOrdering); + } + } + return resultNodeList; + } + + @Override + public List<PlanNode> visitTableScan(TableScanNode node, PlanContext context) { + + Map<TRegionReplicaSet, TableScanNode> tableScanNodeMap = new HashMap<>(); + + for (DeviceEntry deviceEntry : node.getDeviceEntries()) { + List<TRegionReplicaSet> regionReplicaSets = + analysis + .getDataPartitionInfo() + .getDataRegionReplicaSetWithTimeFilter( + node.getQualifiedObjectName().getDatabaseName(), + deviceEntry.getDeviceID(), + node.getTimeFilter()); + for (TRegionReplicaSet regionReplicaSet : regionReplicaSets) { + TableScanNode tableScanNode = + tableScanNodeMap.computeIfAbsent( + regionReplicaSet, + k -> { + TableScanNode scanNode = + new TableScanNode( + queryId.genPlanNodeId(), + node.getQualifiedObjectName(), + node.getOutputSymbols(), + node.getAssignments(), + new ArrayList<>(), + node.getIdAndAttributeIndexMap(), + node.getScanOrder(), + node.getTimePredicate().orElse(null), + node.getPushDownPredicate()); + scanNode.setRegionReplicaSet(regionReplicaSet); + return scanNode; + }); + tableScanNode.appendDeviceEntry(deviceEntry); + } + } + + context.hasExchangeNode = tableScanNodeMap.size() > 1; + + List<PlanNode> resultTableScanNodeList = new ArrayList<>(); + TRegionReplicaSet mostUsedDataRegion = null; + int maxDeviceEntrySizeOfTableScan = 0; + for (Map.Entry<TRegionReplicaSet, TableScanNode> entry : tableScanNodeMap.entrySet()) { + TRegionReplicaSet regionReplicaSet = entry.getKey(); + TableScanNode subTableScanNode = entry.getValue(); + subTableScanNode.setPlanNodeId(queryId.genPlanNodeId()); + subTableScanNode.setRegionReplicaSet(regionReplicaSet); + resultTableScanNodeList.add(subTableScanNode); + + if (mostUsedDataRegion == null + || subTableScanNode.getDeviceEntries().size() > maxDeviceEntrySizeOfTableScan) { + mostUsedDataRegion = regionReplicaSet; + maxDeviceEntrySizeOfTableScan = subTableScanNode.getDeviceEntries().size(); + } + } + context.mostUsedDataRegion = mostUsedDataRegion; + + if (!context.hasSortNode) { + return resultTableScanNodeList; + } + + processSortProperty(node, resultTableScanNodeList, context); + return resultTableScanNodeList; + } + + private PlanNode mergeChildrenViaCollectOrMergeSort( + OrderingScheme childOrdering, List<PlanNode> childrenNodes) { + PlanNode firstChild = childrenNodes.get(0); + + // children has sort property, use MergeSort to merge children + if (childOrdering != null) { + MergeSortNode mergeSortNode = + new MergeSortNode(queryId.genPlanNodeId(), childOrdering, firstChild.getOutputSymbols()); + childrenNodes.forEach(mergeSortNode::addChild); + nodeOrderingMap.put(mergeSortNode.getPlanNodeId(), childOrdering); + return mergeSortNode; + } + + // children has no sort property, use CollectNode to merge children + CollectNode collectNode = new CollectNode(queryId.genPlanNodeId()); + childrenNodes.forEach(collectNode::addChild); + return collectNode; + } + + private void processSortProperty( + TableScanNode tableScanNode, List<PlanNode> resultTableScanNodeList, PlanContext context) { + List<Symbol> newOrderingSymbols = new ArrayList<>(); + List<SortOrder> newSortOrders = new ArrayList<>(); + OrderingScheme expectedOrderingScheme = context.expectedOrderingScheme; + + for (Symbol symbol : expectedOrderingScheme.getOrderBy()) { + if (TIMESTAMP_STR.equalsIgnoreCase(symbol.getName())) { + if (!expectedOrderingScheme.getOrderings().get(symbol).isAscending()) { + // TODO(beyyes) move scan order judgement into logical plan optimizer + resultTableScanNodeList.forEach( + node -> ((TableScanNode) node).setScanOrder(Ordering.DESC)); + } + break; + } else if (!tableScanNode.getIdAndAttributeIndexMap().containsKey(symbol)) { + break; + } + + newOrderingSymbols.add(symbol); + newSortOrders.add(expectedOrderingScheme.getOrdering(symbol)); + } + + // no sort property can be pushed down into TableScanNode + if (newOrderingSymbols.isEmpty()) { + return; + } + + List<Function<DeviceEntry, String>> orderingRules = new ArrayList<>(); + for (Symbol symbol : newOrderingSymbols) { + int idx = tableScanNode.getIdAndAttributeIndexMap().get(symbol); + if (tableScanNode.getAssignments().get(symbol).getColumnCategory() + == TsTableColumnCategory.ID) { + // segments[0] is always tableName + orderingRules.add(deviceEntry -> (String) deviceEntry.getDeviceID().getSegments()[idx + 1]); + } else { + orderingRules.add(deviceEntry -> deviceEntry.getAttributeColumnValues().get(idx)); + } + } + + Comparator<DeviceEntry> comparator; + if (newSortOrders.get(0).isNullsFirst()) { + comparator = + newSortOrders.get(0).isAscending() + ? Comparator.nullsFirst(Comparator.comparing(orderingRules.get(0))) + : Comparator.nullsFirst(Comparator.comparing(orderingRules.get(0))).reversed(); + } else { + comparator = + newSortOrders.get(0).isAscending() + ? Comparator.nullsLast(Comparator.comparing(orderingRules.get(0))) + : Comparator.nullsLast(Comparator.comparing(orderingRules.get(0))).reversed(); + } + for (int i = 1; i < orderingRules.size(); i++) { + Comparator<DeviceEntry> thenComparator; + if (newSortOrders.get(i).isNullsFirst()) { + thenComparator = + newSortOrders.get(i).isAscending() + ? Comparator.nullsFirst(Comparator.comparing(orderingRules.get(i))) + : Comparator.nullsFirst(Comparator.comparing(orderingRules.get(i))).reversed(); + } else { + thenComparator = + newSortOrders.get(i).isAscending() + ? Comparator.nullsLast(Comparator.comparing(orderingRules.get(i))) + : Comparator.nullsLast(Comparator.comparing(orderingRules.get(i))).reversed(); + } + comparator = comparator.thenComparing(thenComparator); + } + + OrderingScheme newOrderingScheme = + new OrderingScheme( + newOrderingSymbols, + IntStream.range(0, newOrderingSymbols.size()) + .boxed() + .collect(Collectors.toMap(newOrderingSymbols::get, newSortOrders::get))); + for (PlanNode planNode : resultTableScanNodeList) { + TableScanNode scanNode = (TableScanNode) planNode; + nodeOrderingMap.put(scanNode.getPlanNodeId(), newOrderingScheme); + scanNode.getDeviceEntries().sort(comparator); + } + } + + // ------------------- schema related interface --------------------------------------------- + + @Override + public List<PlanNode> visitSchemaQueryMerge(SchemaQueryMergeNode node, PlanContext context) { + return Collections.singletonList( + addExchangeNodeForSchemaMerge(rewriteSchemaQuerySource(node, context), context)); + } + + private SchemaQueryMergeNode rewriteSchemaQuerySource( + SchemaQueryMergeNode node, PlanContext context) { + SchemaQueryMergeNode root = (SchemaQueryMergeNode) node.clone(); + + String database = ((TableDeviceSourceNode) node.getChildren().get(0)).getDatabase(); ++ database = CommonUtils.qualifyDatabaseName(database); + Set<TRegionReplicaSet> schemaRegionSet = new HashSet<>(); + analysis + .getSchemaPartitionInfo() + .getSchemaPartitionMap() + .get(database) + .forEach( + (deviceGroupId, schemaRegionReplicaSet) -> schemaRegionSet.add(schemaRegionReplicaSet)); + + for (PlanNode child : node.getChildren()) { + for (TRegionReplicaSet schemaRegion : schemaRegionSet) { + SourceNode clonedChild = (SourceNode) child.clone(); + clonedChild.setPlanNodeId(queryId.genPlanNodeId()); + clonedChild.setRegionReplicaSet(schemaRegion); + root.addChild(clonedChild); + } + } + return root; + } + + private PlanNode addExchangeNodeForSchemaMerge( + AbstractSchemaMergeNode node, PlanContext context) { + node.getChildren() + .forEach( + child -> + context.putNodeDistribution( + child.getPlanNodeId(), + new NodeDistribution( + NodeDistributionType.NO_CHILD, + ((SourceNode) child).getRegionReplicaSet()))); + NodeDistribution nodeDistribution = + new NodeDistribution(NodeDistributionType.DIFFERENT_FROM_ALL_CHILDREN); + PlanNode newNode = node.clone(); + nodeDistribution.setRegion(calculateSchemaRegionByChildren(node.getChildren(), context)); + context.putNodeDistribution(newNode.getPlanNodeId(), nodeDistribution); + node.getChildren() + .forEach( + child -> { + if (!nodeDistribution + .getRegion() + .equals(context.getNodeDistribution(child.getPlanNodeId()).getRegion())) { + ExchangeNode exchangeNode = new ExchangeNode(queryId.genPlanNodeId()); + exchangeNode.setChild(child); + exchangeNode.setOutputColumnNames(child.getOutputColumnNames()); + context.hasExchangeNode = true; + newNode.addChild(exchangeNode); + } else { + newNode.addChild(child); + } + }); + return newNode; + } + + private TRegionReplicaSet calculateSchemaRegionByChildren( + List<PlanNode> children, PlanContext context) { + // We always make the schemaRegion of SchemaMergeNode to be the same as its first child. + return context.getNodeDistribution(children.get(0).getPlanNodeId()).getRegion(); + } + + public static class PlanContext { + final Map<PlanNodeId, NodeDistribution> nodeDistributionMap; + boolean hasExchangeNode = false; + boolean hasSortNode = false; + OrderingScheme expectedOrderingScheme; + TRegionReplicaSet mostUsedDataRegion; + + public PlanContext() { + this.nodeDistributionMap = new HashMap<>(); + } + + public NodeDistribution getNodeDistribution(PlanNodeId nodeId) { + return this.nodeDistributionMap.get(nodeId); + } + + public void putNodeDistribution(PlanNodeId nodeId, NodeDistribution distribution) { + this.nodeDistributionMap.put(nodeId, distribution); + } + } + } diff --cc iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributionPlanner.java index 843eace5282,70a87f06dec..038e4fac02e --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributionPlanner.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributionPlanner.java @@@ -81,11 -84,9 +86,11 @@@ public class TableDistributionPlanner List<FragmentInstance> fragmentInstances = mppQueryContext.getQueryType() == QueryType.READ ? new TableModelQueryFragmentPlanner(subPlan, analysis, mppQueryContext).plan() - : new WriteFragmentParallelPlanner(subPlan, analysis, mppQueryContext).parallelPlan(); + : new WriteFragmentParallelPlanner( + subPlan, analysis, mppQueryContext, WritePlanNode::splitByPartition) + .parallelPlan(); - // Only execute this step for READ operation + // only execute this step for READ operation if (mppQueryContext.getQueryType() == QueryType.READ) { setSinkForRootInstance(subPlan, fragmentInstances); } diff --cc iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/CreateTableDeviceNode.java index 13209c2274f,6de1ef903ce..a01e6b1c5df --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/CreateTableDeviceNode.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/CreateTableDeviceNode.java @@@ -122,16 -151,16 +151,15 @@@ public class CreateTableDeviceNode exte public List<IDeviceID> getPartitionKeyList() { if (partitionKeyList == null) { - List<IDeviceID> partitionKeyList = new ArrayList<>(); + List<IDeviceID> tmpPartitionKeyList = new ArrayList<>(); for (Object[] rawId : deviceIdList) { -- String[] partitionKey = new String[rawId.length + 1]; -- partitionKey[0] = tableName; ++ String[] partitionKey = new String[rawId.length]; for (int i = 0; i < rawId.length; i++) { - partitionKey[i + 1] = Objects.toString(rawId[i].toString()); - partitionKey[i + 1] = (String) rawId[i]; ++ partitionKey[i] = (String) rawId[i]; } - partitionKeyList.add(IDeviceID.Factory.DEFAULT_FACTORY.create(partitionKey)); + tmpPartitionKeyList.add(IDeviceID.Factory.DEFAULT_FACTORY.create(partitionKey)); } - this.partitionKeyList = partitionKeyList; + this.partitionKeyList = tmpPartitionKeyList; } return partitionKeyList; } diff --cc iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/PushPredicateIntoTableScan.java index 4baa449773f,98b10ab23ce..f52eeff656d --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/PushPredicateIntoTableScan.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/PushPredicateIntoTableScan.java @@@ -248,17 -242,6 +244,17 @@@ public class PushPredicateIntoTableSca return node; } + @Override - public PlanNode visitInsertTablet(InsertTabletNode node, RewriterContext context) { ++ public PlanNode visitInsertTablet(InsertTabletNode node, Void context) { + return node; + } + + @Override + public PlanNode visitRelationalInsertTablet( - RelationalInsertTabletNode node, RewriterContext context) { ++ RelationalInsertTabletNode node, Void context) { + return node; + } + /** Get deviceEntries and DataPartition used in TableScan. */ private void tableMetadataIndexScan(TableScanNode node, List<Expression> metadataExpressions) { List<String> attributeColumns = diff --cc iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/CommonUtils.java index cbadd577eea,9b902d0cf44..26e04803f5a --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/CommonUtils.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/CommonUtils.java @@@ -402,76 -397,4 +402,83 @@@ public class CommonUtils System.err.println("-- StackTrace --"); System.err.println(Throwables.getStackTraceAsString(e)); } + + public static String[] deviceIdToStringArray(IDeviceID deviceID) { + String[] ret = new String[deviceID.segmentNum()]; + for (int i = 0; i < ret.length; i++) { + ret[i] = deviceID.segment(i).toString(); + } + return ret; + } + + public static Object[] deviceIdToObjArray(IDeviceID deviceID) { + Object[] ret = new Object[deviceID.segmentNum()]; + for (int i = 0; i < ret.length; i++) { + ret[i] = deviceID.segment(i); + } + return ret; + } + + /** + * Check whether the time falls in TTL. + * + * @return whether the given time falls in ttl + */ + public static boolean isAlive(long time, long dataTTL) { + return dataTTL == Long.MAX_VALUE || (CommonDateTimeUtils.currentTime() - time) <= dataTTL; + } + + public static Object createValueColumnOfDataType( + TSDataType dataType, TsTableColumnCategory columnCategory, int rowNum) { + Object valueColumn; + switch (dataType) { + case INT32: + valueColumn = new int[rowNum]; + break; + case INT64: + case TIMESTAMP: + valueColumn = new long[rowNum]; + break; + case FLOAT: + valueColumn = new float[rowNum]; + break; + case DOUBLE: + valueColumn = new double[rowNum]; + break; + case BOOLEAN: + valueColumn = new boolean[rowNum]; + break; + case TEXT: + case STRING: + if (columnCategory.equals(TsTableColumnCategory.MEASUREMENT)) { + valueColumn = new Binary[rowNum]; + } else { + valueColumn = new String[rowNum]; + } + break; + case BLOB: + valueColumn = new Binary[rowNum]; + break; + case DATE: + valueColumn = new LocalDate[rowNum]; + break; + default: + throw new UnSupportedDataTypeException( + String.format("Data type %s is not supported.", dataType)); + } + return valueColumn; + } + + public static void swapArray(Object[] array, int i, int j) { + Object tmp = array[i]; + array[i] = array[j]; + array[j] = tmp; + } ++ ++ public static String qualifyDatabaseName(String databaseName) { ++ if (databaseName != null && !databaseName.startsWith("root.")) { ++ databaseName = "root." + databaseName; ++ } ++ return databaseName; ++ } } diff --cc iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/AnalyzerTest.java index 292586672a3,4ff1d6c2b03..55e6bbbc251 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/AnalyzerTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/AnalyzerTest.java @@@ -37,8 -28,7 +37,9 @@@ import org.apache.iotdb.db.queryengine. import org.apache.iotdb.db.queryengine.plan.planner.plan.DistributedQueryPlan; import org.apache.iotdb.db.queryengine.plan.planner.plan.LogicalQueryPlan; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.RelationalInsertRowNode; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.RelationalInsertTabletNode; + import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.ExchangeNode; import org.apache.iotdb.db.queryengine.plan.relational.function.OperatorType; import org.apache.iotdb.db.queryengine.plan.relational.metadata.ColumnHandle; import org.apache.iotdb.db.queryengine.plan.relational.metadata.ColumnSchema; @@@ -648,182 -759,13 +777,182 @@@ public class AnalyzerTest assertEquals(3, offsetNode.getCount()); } + @Test + public void sortTest() { + // when TableScan locates multi regions, use default MergeSortNode + sql = "SELECT * FROM table1 "; + context = new MPPQueryContext(sql, queryId, sessionInfo, null, null); + actualAnalysis = analyzeSQL(sql, metadata); + logicalQueryPlan = + new LogicalPlanner(context, metadata, sessionInfo, WarningCollector.NOOP) + .plan(actualAnalysis); + rootNode = logicalQueryPlan.getRootNode(); + } + + private Metadata mockMetadataForInsertion() { + return new TestMatadata() { + @Override - public TableSchema validateTableHeaderSchema( ++ public Optional<TableSchema> validateTableHeaderSchema( + String database, TableSchema schema, MPPQueryContext context) { + TableSchema tableSchema = StatementTestUtils.genTableSchema(); + assertEquals(tableSchema, schema); - return tableSchema; ++ return Optional.of(tableSchema); + } + + @Override + public void validateDeviceSchema( + ITableDeviceSchemaValidation schemaValidation, MPPQueryContext context) { + assertEquals(sessionInfo.getDatabaseName().get(), schemaValidation.getDatabase()); + assertEquals(StatementTestUtils.tableName(), schemaValidation.getTableName()); + Object[] columns = StatementTestUtils.genColumns(); + for (int i = 0; i < schemaValidation.getDeviceIdList().size(); i++) { + Object[] objects = schemaValidation.getDeviceIdList().get(i); + assertEquals(objects[0].toString(), StatementTestUtils.tableName()); + assertEquals(objects[1].toString(), ((String[]) columns[0])[i]); + } + List<String> attributeColumnNameList = schemaValidation.getAttributeColumnNameList(); + assertEquals(Collections.singletonList("attr1"), attributeColumnNameList); + assertEquals(1, schemaValidation.getAttributeValueList().size()); + for (int i = 0; i < schemaValidation.getAttributeValueList().size(); i++) { + assertEquals( + ((Object[]) columns[1])[i], ((Object[]) schemaValidation.getAttributeValueList().get(0))[i]); + } + } + + @Override + public DataPartition getOrCreateDataPartition( + List<DataPartitionQueryParam> dataPartitionQueryParams, String userName) { + int seriesSlotNum = StatementTestUtils.TEST_SERIES_SLOT_NUM; + String partitionExecutorName = StatementTestUtils.TEST_PARTITION_EXECUTOR; + SeriesPartitionExecutor seriesPartitionExecutor = + SeriesPartitionExecutor.getSeriesPartitionExecutor( + partitionExecutorName, seriesSlotNum); + + Map<String, Map<TSeriesPartitionSlot, Map<TTimePartitionSlot, List<TRegionReplicaSet>>>> + dataPartitionMap = new HashMap<>(); + + for (DataPartitionQueryParam dataPartitionQueryParam : dataPartitionQueryParams) { + String databaseName = dataPartitionQueryParam.getDatabaseName(); + assertEquals(sessionInfo.getDatabaseName().get(), databaseName); + + String tableName = dataPartitionQueryParam.getDeviceID().getTableName(); + assertEquals(StatementTestUtils.tableName(), tableName); + + TSeriesPartitionSlot partitionSlot = + seriesPartitionExecutor.getSeriesPartitionSlot( + dataPartitionQueryParam.getDeviceID()); + for (TTimePartitionSlot tTimePartitionSlot : + dataPartitionQueryParam.getTimePartitionSlotList()) { + dataPartitionMap + .computeIfAbsent(databaseName, d -> new HashMap<>()) + .computeIfAbsent(partitionSlot, slot -> new HashMap<>()) + .computeIfAbsent(tTimePartitionSlot, slot -> new ArrayList<>()) + .add( + new TRegionReplicaSet( + new TConsensusGroupId( + TConsensusGroupType.DataRegion, partitionSlot.slotId), + Collections.singletonList( + new TDataNodeLocation( + partitionSlot.slotId, null, null, null, null, null)))); + } + } + return new DataPartition(dataPartitionMap, partitionExecutorName, seriesSlotNum); + } + }; + } + + @Test + public void analyzeInsertTablet() { + Metadata mockMetadata = mockMetadataForInsertion(); + + InsertTabletStatement insertTabletStatement = StatementTestUtils.genInsertTabletStatement(true); + context = new MPPQueryContext("", queryId, sessionInfo, null, null); + actualAnalysis = + analyzeStatement( + insertTabletStatement.toRelationalStatement(context), + mockMetadata, + new SqlParser(), + sessionInfo); + assertEquals(1, actualAnalysis.getDataPartition().getDataPartitionMap().size()); + Map<TSeriesPartitionSlot, Map<TTimePartitionSlot, List<TRegionReplicaSet>>> partitionSlotMapMap = actualAnalysis.getDataPartition() + .getDataPartitionMap().get(sessionInfo.getDatabaseName().orElse(null)); + assertEquals(3, partitionSlotMapMap.size()); + + logicalQueryPlan = + new LogicalPlanner(context, mockMetadata, sessionInfo, WarningCollector.NOOP) + .plan(actualAnalysis); + + RelationalInsertTabletNode insertTabletNode = + (RelationalInsertTabletNode) logicalQueryPlan.getRootNode(); + + assertEquals(insertTabletNode.getTableName(), StatementTestUtils.tableName()); + assertEquals(3, insertTabletNode.getRowCount()); + Object[] columns = StatementTestUtils.genColumns(); + for (int i = 0; i < insertTabletNode.getRowCount(); i++) { + assertEquals( + Factory.DEFAULT_FACTORY.create( + new String[] {StatementTestUtils.tableName(), ((String[]) columns[0])[i]}), + insertTabletNode.getDeviceID(i)); + } + assertArrayEquals(columns, insertTabletNode.getColumns()); + assertArrayEquals(StatementTestUtils.genTimestamps(), insertTabletNode.getTimes()); + + distributionPlanner = new TableDistributionPlanner(actualAnalysis, logicalQueryPlan, context); + distributedQueryPlan = distributionPlanner.plan(); + assertEquals(3, distributedQueryPlan.getInstances().size()); + } + + @Test + public void analyzeInsertRow() { + Metadata mockMetadata = mockMetadataForInsertion(); + + InsertRowStatement insertStatement = StatementTestUtils.genInsertRowStatement(true); + context = new MPPQueryContext("", queryId, sessionInfo, null, null); + actualAnalysis = + analyzeStatement( + insertStatement.toRelationalStatement(context), + mockMetadata, + new SqlParser(), + sessionInfo); + assertEquals(1, actualAnalysis.getDataPartition().getDataPartitionMap().size()); + Map<TSeriesPartitionSlot, Map<TTimePartitionSlot, List<TRegionReplicaSet>>> partitionSlotMapMap = actualAnalysis.getDataPartition() + .getDataPartitionMap().get(sessionInfo.getDatabaseName().orElse(null)); + assertEquals(1, partitionSlotMapMap.size()); + + logicalQueryPlan = + new LogicalPlanner(context, mockMetadata, sessionInfo, WarningCollector.NOOP) + .plan(actualAnalysis); + + RelationalInsertRowNode insertNode = + (RelationalInsertRowNode) logicalQueryPlan.getRootNode(); + + assertEquals(insertNode.getTableName(), StatementTestUtils.tableName()); + Object[] columns = StatementTestUtils.genValues(0); + assertEquals( + Factory.DEFAULT_FACTORY.create( + new String[] {StatementTestUtils.tableName(), ((String) columns[0])}), + insertNode.getDeviceID()); + + assertArrayEquals(columns, insertNode.getValues()); + assertEquals(StatementTestUtils.genTimestamps()[0], insertNode.getTime()); + + distributionPlanner = new TableDistributionPlanner(actualAnalysis, logicalQueryPlan, context); + distributedQueryPlan = distributionPlanner.plan(); + assertEquals(1, distributedQueryPlan.getInstances().size()); + } + public static Analysis analyzeSQL(String sql, Metadata metadata) { + SqlParser sqlParser = new SqlParser(); + Statement statement = sqlParser.createStatement(sql, ZoneId.systemDefault()); + SessionInfo session = + new SessionInfo( + 0, "test", ZoneId.systemDefault(), "testdb", IClientSession.SqlDialect.TABLE); + return analyzeStatement(statement, metadata, sqlParser, session); + } + + public static Analysis analyzeStatement( + Statement statement, Metadata metadata, SqlParser sqlParser, SessionInfo session) { try { - SqlParser sqlParser = new SqlParser(); - Statement statement = sqlParser.createStatement(sql, ZoneId.systemDefault()); - SessionInfo session = - new SessionInfo( - 0, "test", ZoneId.systemDefault(), "testdb", IClientSession.SqlDialect.TABLE); StatementAnalyzerFactory statementAnalyzerFactory = new StatementAnalyzerFactory(metadata, sqlParser, nopAccessControl); diff --cc iotdb-protocol/thrift-confignode/src/main/thrift/confignode.thrift index 16c728caf81,16c728caf81..cf5c3575674 --- a/iotdb-protocol/thrift-confignode/src/main/thrift/confignode.thrift +++ b/iotdb-protocol/thrift-confignode/src/main/thrift/confignode.thrift @@@ -1075,6 -1075,6 +1075,11 @@@ service IConfigNodeRPCService */ TSchemaPartitionTableResp getSchemaPartitionTable(TSchemaPartitionReq req) ++ /** ++ * Get SchemaPartitionTable by specific database name and series slots. ++ **/ ++ TSchemaPartitionTableResp getSchemaPartitionTableWithSlots(map<string, list<common.TSeriesPartitionSlot>> dbSlotMap) ++ /** * Get or create SchemaPartitionTable by specific PathPatternTree, * the returned SchemaPartitionTable always contains all the SeriesPartitionSlots @@@ -1086,6 -1086,6 +1091,11 @@@ */ TSchemaPartitionTableResp getOrCreateSchemaPartitionTable(TSchemaPartitionReq req) ++ /** ++ * Get or create SchemaPartitionTable by specific database name and series slots. ++ **/ ++ TSchemaPartitionTableResp getOrCreateSchemaPartitionTableWithSlots(map<string, list<common.TSeriesPartitionSlot>> dbSlotMap) ++ // ====================================================== // Node Management // ======================================================
