This is an automated email from the ASF dual-hosted git repository. caogaofei pushed a commit to branch new-table-model-debug in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit b000107b97bb649f96b99fa49af92d47a9438b42 Author: Beyyes <[email protected]> AuthorDate: Thu May 30 10:58:30 2024 +0800 merge with ty/TabelGrammarModel --- .../plan/relational/planner/LogicalPlanner.java | 8 +- .../relational/planner/RelationalModelPlanner.java | 13 +- ...nPlanner.java => TableDistributionPlanner.java} | 4 +- .../planner/optimizations/IndexScan.java | 53 +++---- .../optimizations/PruneTableScanColumns.java | 2 + .../optimizations/RelationalPlanOptimizer.java | 2 + .../RemoveRedundantIdentityProjections.java | 2 + .../planner/optimizations/SimplifyExpressions.java | 2 + .../plan/statement/crud/InsertTableStatement.java | 3 +- .../plan/relational/analyzer/AnalyzerTest.java | 162 ++++++++++++++++--- .../relational/analyzer/MockTablePartition.java | 174 +++++++++++++++++++++ .../plan/relational/analyzer/TestMatadata.java | 4 +- 12 files changed, 361 insertions(+), 68 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/LogicalPlanner.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/LogicalPlanner.java index 9190bcf5928..08e006c07f2 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/LogicalPlanner.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/LogicalPlanner.java @@ -20,6 +20,7 @@ import org.apache.iotdb.db.queryengine.common.SessionInfo; import org.apache.iotdb.db.queryengine.common.header.ColumnHeader; import org.apache.iotdb.db.queryengine.common.header.DatasetHeader; import org.apache.iotdb.db.queryengine.execution.warnings.WarningCollector; +import org.apache.iotdb.db.queryengine.plan.analyze.IPartitionFetcher; import org.apache.iotdb.db.queryengine.plan.planner.plan.LogicalQueryPlan; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode; import org.apache.iotdb.db.queryengine.plan.relational.analyzer.Analysis; @@ -59,16 +60,19 @@ public class LogicalPlanner { private final SymbolAllocator symbolAllocator = new SymbolAllocator(); private final List<RelationalPlanOptimizer> relationalPlanOptimizers; private final Metadata metadata; + private final IPartitionFetcher partitionFetcher; private final WarningCollector warningCollector; public LogicalPlanner( MPPQueryContext context, Metadata metadata, SessionInfo sessionInfo, + IPartitionFetcher partitionFetcher, WarningCollector warningCollector) { this.context = context; this.metadata = metadata; this.sessionInfo = requireNonNull(sessionInfo, "session is null"); + this.partitionFetcher = requireNonNull(partitionFetcher, "partitionFetcher is null"); this.warningCollector = requireNonNull(warningCollector, "warningCollector is null"); this.relationalPlanOptimizers = @@ -83,7 +87,9 @@ public class LogicalPlanner { PlanNode planNode = planStatement(analysis, analysis.getStatement()); relationalPlanOptimizers.forEach( - optimizer -> optimizer.optimize(planNode, analysis, metadata, sessionInfo, context)); + optimizer -> + optimizer.optimize( + planNode, analysis, metadata, partitionFetcher, sessionInfo, context)); return new LogicalQueryPlan(context, planNode); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/RelationalModelPlanner.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/RelationalModelPlanner.java index f807fd19dfe..4181e058644 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/RelationalModelPlanner.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/RelationalModelPlanner.java @@ -28,6 +28,7 @@ import org.apache.iotdb.commons.exception.IoTDBException; import org.apache.iotdb.db.queryengine.common.MPPQueryContext; import org.apache.iotdb.db.queryengine.execution.QueryStateMachine; import org.apache.iotdb.db.queryengine.execution.warnings.WarningCollector; +import org.apache.iotdb.db.queryengine.plan.analyze.ClusterPartitionFetcher; import org.apache.iotdb.db.queryengine.plan.analyze.IAnalysis; import org.apache.iotdb.db.queryengine.plan.planner.IPlanner; import org.apache.iotdb.db.queryengine.plan.planner.plan.DistributedQueryPlan; @@ -36,7 +37,7 @@ import org.apache.iotdb.db.queryengine.plan.relational.analyzer.Analysis; import org.apache.iotdb.db.queryengine.plan.relational.analyzer.Analyzer; import org.apache.iotdb.db.queryengine.plan.relational.analyzer.StatementAnalyzerFactory; import org.apache.iotdb.db.queryengine.plan.relational.metadata.Metadata; -import org.apache.iotdb.db.queryengine.plan.relational.planner.distribute.RelationalDistributionPlanner; +import org.apache.iotdb.db.queryengine.plan.relational.planner.distribute.TableDistributionPlanner; import org.apache.iotdb.db.queryengine.plan.relational.security.AccessControl; import org.apache.iotdb.db.queryengine.plan.scheduler.ClusterScheduler; import org.apache.iotdb.db.queryengine.plan.scheduler.IScheduler; @@ -108,7 +109,12 @@ public class RelationalModelPlanner implements IPlanner { @Override public LogicalQueryPlan doLogicalPlan(IAnalysis analysis, MPPQueryContext context) { try { - return new LogicalPlanner(context, metadata, context.getSession(), warningCollector) + return new LogicalPlanner( + context, + metadata, + context.getSession(), + ClusterPartitionFetcher.getInstance(), + warningCollector) .plan((Analysis) analysis); } catch (IoTDBException e) { throw new RuntimeException(e); @@ -117,8 +123,7 @@ public class RelationalModelPlanner implements IPlanner { @Override public DistributedQueryPlan doDistributionPlan(IAnalysis analysis, LogicalQueryPlan logicalPlan) { - return new RelationalDistributionPlanner( - (Analysis) analysis, logicalPlan, logicalPlan.getContext()) + return new TableDistributionPlanner((Analysis) analysis, logicalPlan, logicalPlan.getContext()) .plan(); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/RelationalDistributionPlanner.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributionPlanner.java similarity index 98% rename from iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/RelationalDistributionPlanner.java rename to iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributionPlanner.java index f9dd7e85cec..806db78a2c8 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/RelationalDistributionPlanner.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributionPlanner.java @@ -36,12 +36,12 @@ import java.util.stream.Collectors; import static org.apache.iotdb.db.queryengine.plan.expression.leaf.TimestampOperand.TIMESTAMP_EXPRESSION_STRING; -public class RelationalDistributionPlanner { +public class TableDistributionPlanner { private final Analysis analysis; private final LogicalQueryPlan logicalQueryPlan; private final MPPQueryContext mppQueryContext; - public RelationalDistributionPlanner( + public TableDistributionPlanner( Analysis analysis, LogicalQueryPlan logicalQueryPlan, MPPQueryContext mppQueryContext) { this.analysis = analysis; this.logicalQueryPlan = logicalQueryPlan; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/IndexScan.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/IndexScan.java index 0e1ccefde5d..7caa699f0af 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/IndexScan.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/IndexScan.java @@ -14,8 +14,6 @@ package org.apache.iotdb.db.queryengine.plan.relational.planner.optimizations; -import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation; -import org.apache.iotdb.common.rpc.thrift.TEndPoint; import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet; import org.apache.iotdb.common.rpc.thrift.TSeriesPartitionSlot; import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot; @@ -25,7 +23,6 @@ import org.apache.iotdb.db.conf.IoTDBConfig; import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.queryengine.common.MPPQueryContext; import org.apache.iotdb.db.queryengine.common.SessionInfo; -import org.apache.iotdb.db.queryengine.plan.analyze.ClusterPartitionFetcher; import org.apache.iotdb.db.queryengine.plan.analyze.IPartitionFetcher; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanVisitor; @@ -56,7 +53,7 @@ import java.util.stream.Collectors; import static org.apache.iotdb.commons.schema.table.column.TsTableColumnCategory.ATTRIBUTE; import static org.apache.iotdb.db.queryengine.plan.analyze.AnalyzeVisitor.getTimePartitionSlotList; -/** Extract IDeviceID and */ +/** Extract IDeviceID */ public class IndexScan implements RelationalPlanOptimizer { static final IoTDBConfig CONFIG = IoTDBDescriptor.getInstance().getConfig(); @@ -68,11 +65,13 @@ public class IndexScan implements RelationalPlanOptimizer { PlanNode planNode, Analysis analysis, Metadata metadata, + IPartitionFetcher partitionFetcher, SessionInfo sessionInfo, MPPQueryContext context) { return planNode.accept( - new Rewriter(), new RewriterContext(null, metadata, sessionInfo, analysis)); + new Rewriter(), + new RewriterContext(null, metadata, sessionInfo, analysis, partitionFetcher)); } private static class Rewriter extends PlanVisitor<PlanNode, RewriterContext> { @@ -100,21 +99,18 @@ public class IndexScan implements RelationalPlanOptimizer { .collect(Collectors.toList()); List<Expression> conjExpressions = getConjunctionExpressions(context.getPredicate(), node); - + String dbName = context.getSessionInfo().getDatabaseName().get(); List<DeviceEntry> deviceEntries = context .getMetadata() .indexScan( - new QualifiedObjectName( - context.getSessionInfo().getDatabaseName().get(), - node.getQualifiedTableName()), + new QualifiedObjectName(dbName, node.getQualifiedTableName()), conjExpressions, attributeColumns); node.setDeviceEntries(deviceEntries); // TODO getDataPartition, Change globalTimeFilter to Filter - String database = "root." + context.getSessionInfo().getDatabaseName().get(); - IPartitionFetcher partitionFetcher = ClusterPartitionFetcher.getInstance(); + String treeDatabase = "root." + dbName; Filter globalTimeFilter = null; Set<String> deviceSet = new HashSet<>(); for (DeviceEntry deviceEntry : deviceEntries) { @@ -124,18 +120,18 @@ public class IndexScan implements RelationalPlanOptimizer { } DataPartition dataPartition = - fetchDataPartitionByDevices(deviceSet, database, globalTimeFilter, partitionFetcher); + fetchDataPartitionByDevices( + deviceSet, treeDatabase, globalTimeFilter, context.partitionFetcher); context.getAnalysis().setDataPartition(dataPartition); if (dataPartition.getDataPartitionMap().size() > 1) { throw new IllegalStateException( - "Table model can only process data only in one data region yet!"); + "Table model can only process data only in one database yet!"); } if (dataPartition.getDataPartitionMap().isEmpty()) { context.getAnalysis().setFinishQueryAfterAnalyze(); } else { - // TODO add the real impl Set<TRegionReplicaSet> regionReplicaSet = new HashSet<>(); for (Map.Entry< String, Map<TSeriesPartitionSlot, Map<TTimePartitionSlot, List<TRegionReplicaSet>>>> @@ -193,6 +189,7 @@ public class IndexScan implements RelationalPlanOptimizer { IPartitionFetcher partitionFetcher) { Pair<List<TTimePartitionSlot>, Pair<Boolean, Boolean>> res = getTimePartitionSlotList(globalTimeFilter); + // there is no satisfied time range if (res.left.isEmpty() && Boolean.FALSE.equals(res.right.left)) { return new DataPartition( @@ -200,6 +197,7 @@ public class IndexScan implements RelationalPlanOptimizer { CONFIG.getSeriesPartitionExecutorClass(), CONFIG.getSeriesPartitionSlotNum()); } + Map<String, List<DataPartitionQueryParam>> sgNameToQueryParamsMap = new HashMap<>(); for (String devicePath : deviceSet) { DataPartitionQueryParam queryParam = @@ -207,15 +205,6 @@ public class IndexScan implements RelationalPlanOptimizer { sgNameToQueryParamsMap.computeIfAbsent(database, key -> new ArrayList<>()).add(queryParam); } - // Map<String, Map<TSeriesPartitionSlot, Map<TTimePartitionSlot, List<TRegionReplicaSet>>>> - // dataPartitionMap = new HashMap<>(); - // dataPartitionMap.put("root.db", Collections.singletonMap(new TSeriesPartitionSlot(1), - // Collections.singletonMap(new TTimePartitionSlot(1), - // Collections.singletonList(new TRegionReplicaSet( - // new TConsensusGroupId(TConsensusGroupType.DataRegion, 1), - // Arrays.asList(genDataNodeLocation(1, "127.0.0.1"))))))); - // return new DataPartition(dataPartitionMap, "hkb", 1); - if (res.right.left || res.right.right) { return partitionFetcher.getDataPartitionWithUnclosedTimeRange(sgNameToQueryParamsMap); } else { @@ -223,26 +212,24 @@ public class IndexScan implements RelationalPlanOptimizer { } } - private static TDataNodeLocation genDataNodeLocation(int dataNodeId, String ip) { - return new TDataNodeLocation() - .setDataNodeId(dataNodeId) - .setClientRpcEndPoint(new TEndPoint(ip, 9000)) - .setMPPDataExchangeEndPoint(new TEndPoint(ip, 9001)) - .setInternalEndPoint(new TEndPoint(ip, 9002)); - } - private static class RewriterContext { private Expression predicate; private Metadata metadata; private final SessionInfo sessionInfo; - private Analysis analysis; + private final Analysis analysis; + private final IPartitionFetcher partitionFetcher; RewriterContext( - Expression predicate, Metadata metadata, SessionInfo sessionInfo, Analysis analysis) { + Expression predicate, + Metadata metadata, + SessionInfo sessionInfo, + Analysis analysis, + IPartitionFetcher partitionFetcher) { this.predicate = predicate; this.metadata = metadata; this.sessionInfo = sessionInfo; this.analysis = analysis; + this.partitionFetcher = partitionFetcher; } public Expression getPredicate() { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/PruneTableScanColumns.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/PruneTableScanColumns.java index dc75af69b26..5b8a709b738 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/PruneTableScanColumns.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/PruneTableScanColumns.java @@ -16,6 +16,7 @@ package org.apache.iotdb.db.queryengine.plan.relational.planner.optimizations; import org.apache.iotdb.db.queryengine.common.MPPQueryContext; import org.apache.iotdb.db.queryengine.common.SessionInfo; +import org.apache.iotdb.db.queryengine.plan.analyze.IPartitionFetcher; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanVisitor; import org.apache.iotdb.db.queryengine.plan.relational.analyzer.Analysis; @@ -39,6 +40,7 @@ public class PruneTableScanColumns implements RelationalPlanOptimizer { PlanNode planNode, Analysis analysis, Metadata metadata, + IPartitionFetcher partitionFetcher, SessionInfo sessionInfo, MPPQueryContext context) { return planNode.accept(new Rewriter(), new RewriterContext()); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/RelationalPlanOptimizer.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/RelationalPlanOptimizer.java index 37611e6a35c..0e96f27be60 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/RelationalPlanOptimizer.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/RelationalPlanOptimizer.java @@ -16,6 +16,7 @@ package org.apache.iotdb.db.queryengine.plan.relational.planner.optimizations; import org.apache.iotdb.db.queryengine.common.MPPQueryContext; import org.apache.iotdb.db.queryengine.common.SessionInfo; +import org.apache.iotdb.db.queryengine.plan.analyze.IPartitionFetcher; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode; import org.apache.iotdb.db.queryengine.plan.relational.analyzer.Analysis; import org.apache.iotdb.db.queryengine.plan.relational.metadata.Metadata; @@ -25,6 +26,7 @@ public interface RelationalPlanOptimizer { PlanNode planNode, Analysis analysis, Metadata metadata, + IPartitionFetcher partitionFetcher, SessionInfo sessionInfo, MPPQueryContext context); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/RemoveRedundantIdentityProjections.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/RemoveRedundantIdentityProjections.java index 67027079a17..28b5ab8d1d7 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/RemoveRedundantIdentityProjections.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/RemoveRedundantIdentityProjections.java @@ -16,6 +16,7 @@ package org.apache.iotdb.db.queryengine.plan.relational.planner.optimizations; import org.apache.iotdb.db.queryengine.common.MPPQueryContext; import org.apache.iotdb.db.queryengine.common.SessionInfo; +import org.apache.iotdb.db.queryengine.plan.analyze.IPartitionFetcher; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanVisitor; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.SingleChildProcessNode; @@ -34,6 +35,7 @@ public class RemoveRedundantIdentityProjections implements RelationalPlanOptimiz PlanNode planNode, Analysis analysis, Metadata metadata, + IPartitionFetcher partitionFetcher, SessionInfo sessionInfo, MPPQueryContext context) { return planNode.accept(new Rewriter(), new RewriterContext()); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/SimplifyExpressions.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/SimplifyExpressions.java index 782c9705136..7033af9e6c0 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/SimplifyExpressions.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/SimplifyExpressions.java @@ -16,6 +16,7 @@ package org.apache.iotdb.db.queryengine.plan.relational.planner.optimizations; import org.apache.iotdb.db.queryengine.common.MPPQueryContext; import org.apache.iotdb.db.queryengine.common.SessionInfo; +import org.apache.iotdb.db.queryengine.plan.analyze.IPartitionFetcher; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanVisitor; import org.apache.iotdb.db.queryengine.plan.relational.analyzer.Analysis; @@ -34,6 +35,7 @@ public class SimplifyExpressions implements RelationalPlanOptimizer { PlanNode planNode, Analysis analysis, Metadata metadata, + IPartitionFetcher partitionFetcher, SessionInfo sessionInfo, MPPQueryContext context) { // TODO add query statement pruning diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertTableStatement.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertTableStatement.java index 4fe5fac42ab..fac59611556 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertTableStatement.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertTableStatement.java @@ -165,7 +165,8 @@ public class InsertTableStatement extends Statement implements ITableDeviceSchem measurements[measurementIndex] = measurement; valueList[measurementIndex] = measurementColumnMap.get(measurement); schemas[measurementIndex] = - ((MeasurementColumnSchema) table.getColumnList().get(i)).getMeasurementSchema(); + (MeasurementSchema) + ((MeasurementColumnSchema) table.getColumnList().get(i)).getMeasurementSchema(); dataTypes[measurementIndex] = schemas[measurementIndex].getType(); measurementIndex++; } diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/AnalyzerTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/AnalyzerTest.java index fd670beb30b..ca305d9063c 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/AnalyzerTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/AnalyzerTest.java @@ -21,13 +21,20 @@ package org.apache.iotdb.db.queryengine.plan.relational.analyzer; import org.apache.iotdb.commons.conf.IoTDBConstant; import org.apache.iotdb.commons.exception.IoTDBException; +import org.apache.iotdb.commons.partition.DataPartition; +import org.apache.iotdb.commons.partition.DataPartitionQueryParam; +import org.apache.iotdb.commons.partition.SchemaNodeManagementPartition; +import org.apache.iotdb.commons.partition.SchemaPartition; +import org.apache.iotdb.commons.path.PathPatternTree; import org.apache.iotdb.db.protocol.session.IClientSession; import org.apache.iotdb.db.queryengine.common.MPPQueryContext; import org.apache.iotdb.db.queryengine.common.QueryId; import org.apache.iotdb.db.queryengine.common.SessionInfo; import org.apache.iotdb.db.queryengine.execution.warnings.WarningCollector; +import org.apache.iotdb.db.queryengine.plan.analyze.IPartitionFetcher; import org.apache.iotdb.db.queryengine.plan.planner.plan.DistributedQueryPlan; import org.apache.iotdb.db.queryengine.plan.planner.plan.LogicalQueryPlan; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode; import org.apache.iotdb.db.queryengine.plan.relational.function.OperatorType; import org.apache.iotdb.db.queryengine.plan.relational.metadata.ColumnHandle; import org.apache.iotdb.db.queryengine.plan.relational.metadata.ColumnSchema; @@ -37,12 +44,14 @@ import org.apache.iotdb.db.queryengine.plan.relational.metadata.QualifiedObjectN import org.apache.iotdb.db.queryengine.plan.relational.metadata.TableHandle; import org.apache.iotdb.db.queryengine.plan.relational.metadata.TableSchema; import org.apache.iotdb.db.queryengine.plan.relational.planner.LogicalPlanner; -import org.apache.iotdb.db.queryengine.plan.relational.planner.distribute.RelationalDistributionPlanner; +import org.apache.iotdb.db.queryengine.plan.relational.planner.distribute.TableDistributionPlanner; +import org.apache.iotdb.db.queryengine.plan.relational.planner.node.OutputNode; +import org.apache.iotdb.db.queryengine.plan.relational.planner.node.TableScanNode; import org.apache.iotdb.db.queryengine.plan.relational.security.AccessControl; import org.apache.iotdb.db.relational.sql.parser.SqlParser; import org.apache.iotdb.db.relational.sql.tree.Statement; +import org.apache.iotdb.mpp.rpc.thrift.TRegionRouteReq; -import org.junit.Ignore; import org.junit.Test; import org.mockito.Mockito; @@ -55,11 +64,14 @@ import java.util.Map; import java.util.Optional; import static org.apache.iotdb.db.queryengine.execution.warnings.WarningCollector.NOOP; +import static org.apache.iotdb.db.queryengine.plan.statement.component.Ordering.ASC; import static org.apache.tsfile.read.common.type.BooleanType.BOOLEAN; import static org.apache.tsfile.read.common.type.DoubleType.DOUBLE; import static org.apache.tsfile.read.common.type.IntType.INT32; import static org.apache.tsfile.read.common.type.LongType.INT64; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import static org.mockito.ArgumentMatchers.eq; @@ -133,38 +145,71 @@ public class AnalyzerTest { System.out.println(actualAnalysis.getTypes()); } - @Ignore - @Test - public void testSingleTableQuery() throws IoTDBException { - // no sort - String sql = "SELECT tag1, s1 FROM table1"; - // + "WHERE time>1 AND tag1='A' OR s2>3"; - Metadata metadata = new TestMatadata(); + QueryId queryId = new QueryId("tmp_query"); + SessionInfo sessionInfo = + new SessionInfo( + 1L, + "iotdb-user", + ZoneId.systemDefault(), + IoTDBConstant.ClientVersion.V_1_0, + "db", + IClientSession.SqlDialect.TABLE); + Metadata metadata = new TestMatadata(); + @Test + public void singleTableNoFilterTest() throws IoTDBException { + // 1. wildcard + String sql = "SELECT * FROM table1"; Analysis actualAnalysis = analyzeSQL(sql, metadata); assertNotNull(actualAnalysis); - System.out.println(actualAnalysis.getTypes()); + assertEquals(1, actualAnalysis.getTables().size()); - QueryId queryId = new QueryId("tmp_query"); - MPPQueryContext context = new MPPQueryContext(queryId); - SessionInfo sessionInfo = - new SessionInfo( - 1L, - "iotdb-user", - ZoneId.systemDefault(), - IoTDBConstant.ClientVersion.V_1_0, - "db", - IClientSession.SqlDialect.TABLE); - WarningCollector warningCollector = WarningCollector.NOOP; + MPPQueryContext context = new MPPQueryContext(sql, queryId, sessionInfo, null, null); LogicalPlanner logicalPlanner = - new LogicalPlanner(context, metadata, sessionInfo, warningCollector); + new LogicalPlanner( + context, metadata, sessionInfo, getFakePartitionFetcher(), WarningCollector.NOOP); LogicalQueryPlan logicalQueryPlan = logicalPlanner.plan(actualAnalysis); - System.out.println(logicalQueryPlan); + PlanNode rootNode = logicalQueryPlan.getRootNode(); + assertTrue(rootNode instanceof OutputNode); + assertTrue(((OutputNode) rootNode).getChild() instanceof TableScanNode); + TableScanNode tableScanNode = (TableScanNode) ((OutputNode) rootNode).getChild(); + assertEquals("table1", tableScanNode.getQualifiedTableName()); + assertEquals(8, tableScanNode.getOutputSymbols().size()); + assertEquals(8, tableScanNode.getAssignments().size()); + assertEquals(1, tableScanNode.getDeviceEntries().size()); + assertEquals(5, tableScanNode.getIdAndAttributeIndexMap().size()); + assertEquals(ASC, tableScanNode.getScanOrder()); - RelationalDistributionPlanner distributionPlanner = - new RelationalDistributionPlanner(actualAnalysis, logicalQueryPlan, context); + TableDistributionPlanner distributionPlanner = + new TableDistributionPlanner(actualAnalysis, logicalQueryPlan, context); DistributedQueryPlan distributedQueryPlan = distributionPlanner.plan(); - System.out.println(distributedQueryPlan); + assertEquals(4, distributedQueryPlan.getInstances().size()); + + // 2. global time filter + sql = "SELECT * FROM table1 where time > 1"; + actualAnalysis = analyzeSQL(sql, metadata); + assertNotNull(actualAnalysis); + assertEquals(1, actualAnalysis.getTables().size()); + + context = new MPPQueryContext(sql, queryId, sessionInfo, null, null); + logicalPlanner = + new LogicalPlanner( + context, metadata, sessionInfo, getFakePartitionFetcher(), WarningCollector.NOOP); + logicalQueryPlan = logicalPlanner.plan(actualAnalysis); + rootNode = logicalQueryPlan.getRootNode(); + assertTrue(rootNode instanceof OutputNode); + assertTrue(((OutputNode) rootNode).getChild() instanceof TableScanNode); + tableScanNode = (TableScanNode) ((OutputNode) rootNode).getChild(); + assertEquals("table1", tableScanNode.getQualifiedTableName()); + assertEquals(8, tableScanNode.getOutputSymbols().size()); + assertEquals(8, tableScanNode.getAssignments().size()); + assertEquals(1, tableScanNode.getDeviceEntries().size()); + assertEquals(5, tableScanNode.getIdAndAttributeIndexMap().size()); + assertEquals(ASC, tableScanNode.getScanOrder()); + + distributionPlanner = new TableDistributionPlanner(actualAnalysis, logicalQueryPlan, context); + distributedQueryPlan = distributionPlanner.plan(); + assertEquals(4, distributedQueryPlan.getInstances().size()); } public static Analysis analyzeSQL(String sql, Metadata metadata) { @@ -193,5 +238,70 @@ public class AnalyzerTest { return null; } + private static final DataPartition DATA_PARTITION = MockTablePartition.constructDataPartition(); + private static final SchemaPartition SCHEMA_PARTITION = + MockTablePartition.constructSchemaPartition(); + + private static IPartitionFetcher getFakePartitionFetcher() { + + return new IPartitionFetcher() { + + @Override + public SchemaPartition getSchemaPartition(PathPatternTree patternTree) { + return SCHEMA_PARTITION; + } + + @Override + public SchemaPartition getOrCreateSchemaPartition( + PathPatternTree patternTree, String userName) { + return SCHEMA_PARTITION; + } + + @Override + public DataPartition getDataPartition( + Map<String, List<DataPartitionQueryParam>> sgNameToQueryParamsMap) { + return DATA_PARTITION; + } + + @Override + public DataPartition getDataPartitionWithUnclosedTimeRange( + Map<String, List<DataPartitionQueryParam>> sgNameToQueryParamsMap) { + return DATA_PARTITION; + } + + @Override + public DataPartition getOrCreateDataPartition( + Map<String, List<DataPartitionQueryParam>> sgNameToQueryParamsMap) { + return DATA_PARTITION; + } + + @Override + public DataPartition getOrCreateDataPartition( + List<DataPartitionQueryParam> dataPartitionQueryParams, String userName) { + return DATA_PARTITION; + } + + @Override + public SchemaNodeManagementPartition getSchemaNodeManagementPartition( + PathPatternTree patternTree, PathPatternTree scope) { + return IPartitionFetcher.super.getSchemaNodeManagementPartition(patternTree, scope); + } + + @Override + public SchemaNodeManagementPartition getSchemaNodeManagementPartitionWithLevel( + PathPatternTree patternTree, PathPatternTree scope, Integer level) { + return null; + } + + @Override + public boolean updateRegionCache(TRegionRouteReq req) { + return false; + } + + @Override + public void invalidAllCache() {} + }; + } + private static class NopAccessControl implements AccessControl {} } diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/MockTablePartition.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/MockTablePartition.java new file mode 100644 index 00000000000..ae24f4a219e --- /dev/null +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/MockTablePartition.java @@ -0,0 +1,174 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.queryengine.plan.relational.analyzer; + +import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId; +import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType; +import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation; +import org.apache.iotdb.common.rpc.thrift.TEndPoint; +import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet; +import org.apache.iotdb.common.rpc.thrift.TSeriesPartitionSlot; +import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot; +import org.apache.iotdb.commons.partition.DataPartition; +import org.apache.iotdb.commons.partition.SchemaPartition; +import org.apache.iotdb.commons.partition.executor.SeriesPartitionExecutor; +import org.apache.iotdb.db.conf.IoTDBDescriptor; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +public class MockTablePartition { + + private static final SeriesPartitionExecutor EXECUTOR = + SeriesPartitionExecutor.getSeriesPartitionExecutor( + IoTDBDescriptor.getInstance().getConfig().getSeriesPartitionExecutorClass(), + IoTDBDescriptor.getInstance().getConfig().getSeriesPartitionSlotNum()); + + private static final String DB_NAME = "root.testdb"; + + private static final String device1 = "root.testdb.d1"; + private static final String device2 = "root.testdb.d22"; + private static final String device3 = "root.testdb.d333"; + private static final String device4 = "root.testdb.d4444"; + private static final String device5 = "root.testdb.d55555"; + private static final String device6 = "root.testdb.d666666"; + + public static DataPartition constructDataPartition() { + TRegionReplicaSet dataRegion1 = + new TRegionReplicaSet( + new TConsensusGroupId(TConsensusGroupType.DataRegion, 1), + Arrays.asList( + genDataNodeLocation(11, "192.0.1.1"), genDataNodeLocation(12, "192.0.1.2"))); + + TRegionReplicaSet dataRegion2 = + new TRegionReplicaSet( + new TConsensusGroupId(TConsensusGroupType.DataRegion, 2), + Arrays.asList( + genDataNodeLocation(21, "192.0.2.1"), genDataNodeLocation(22, "192.0.2.2"))); + + TRegionReplicaSet dataRegion3 = + new TRegionReplicaSet( + new TConsensusGroupId(TConsensusGroupType.DataRegion, 3), + Arrays.asList( + genDataNodeLocation(31, "192.0.3.1"), genDataNodeLocation(32, "192.0.3.2"))); + + TRegionReplicaSet dataRegion4 = + new TRegionReplicaSet( + new TConsensusGroupId(TConsensusGroupType.DataRegion, 4), + Arrays.asList( + genDataNodeLocation(41, "192.0.4.1"), genDataNodeLocation(42, "192.0.4.2"))); + + DataPartition dataPartition = + new DataPartition( + IoTDBDescriptor.getInstance().getConfig().getSeriesPartitionExecutorClass(), + IoTDBDescriptor.getInstance().getConfig().getSeriesPartitionSlotNum()); + + Map<String, Map<TSeriesPartitionSlot, Map<TTimePartitionSlot, List<TRegionReplicaSet>>>> + dataPartitionMap = new HashMap<>(); + Map<TSeriesPartitionSlot, Map<TTimePartitionSlot, List<TRegionReplicaSet>>> sgPartitionMap = + new HashMap<>(); + + List<TRegionReplicaSet> d1DataRegions = new ArrayList<>(); + d1DataRegions.add(dataRegion1); + d1DataRegions.add(dataRegion2); + Map<TTimePartitionSlot, List<TRegionReplicaSet>> d1DataRegionMap = new HashMap<>(); + d1DataRegionMap.put(new TTimePartitionSlot(), d1DataRegions); + + List<TRegionReplicaSet> d2DataRegions = new ArrayList<>(); + d2DataRegions.add(dataRegion3); + Map<TTimePartitionSlot, List<TRegionReplicaSet>> d2DataRegionMap = new HashMap<>(); + d2DataRegionMap.put(new TTimePartitionSlot(), d2DataRegions); + + List<TRegionReplicaSet> d3DataRegions = new ArrayList<>(); + d3DataRegions.add(dataRegion1); + d3DataRegions.add(dataRegion4); + Map<TTimePartitionSlot, List<TRegionReplicaSet>> d3DataRegionMap = new HashMap<>(); + d3DataRegionMap.put(new TTimePartitionSlot(), d3DataRegions); + + List<TRegionReplicaSet> d4DataRegions = new ArrayList<>(); + d4DataRegions.add(dataRegion1); + d4DataRegions.add(dataRegion4); + Map<TTimePartitionSlot, List<TRegionReplicaSet>> d4DataRegionMap = new HashMap<>(); + d4DataRegionMap.put(new TTimePartitionSlot(), d4DataRegions); + + List<TRegionReplicaSet> d5DataRegions = new ArrayList<>(); + d5DataRegions.add(dataRegion4); + Map<TTimePartitionSlot, List<TRegionReplicaSet>> d5DataRegionMap = new HashMap<>(); + d5DataRegionMap.put(new TTimePartitionSlot(), d5DataRegions); + + List<TRegionReplicaSet> d6DataRegions = new ArrayList<>(); + d6DataRegions.add(dataRegion1); + d6DataRegions.add(dataRegion2); + Map<TTimePartitionSlot, List<TRegionReplicaSet>> d6DataRegionMap = new HashMap<>(); + d6DataRegionMap.put(new TTimePartitionSlot(), d6DataRegions); + + sgPartitionMap.put(EXECUTOR.getSeriesPartitionSlot(device1), d1DataRegionMap); + sgPartitionMap.put(EXECUTOR.getSeriesPartitionSlot(device2), d2DataRegionMap); + sgPartitionMap.put(EXECUTOR.getSeriesPartitionSlot(device3), d3DataRegionMap); + sgPartitionMap.put(EXECUTOR.getSeriesPartitionSlot(device4), d4DataRegionMap); + sgPartitionMap.put(EXECUTOR.getSeriesPartitionSlot(device5), d5DataRegionMap); + sgPartitionMap.put(EXECUTOR.getSeriesPartitionSlot(device6), d6DataRegionMap); + + dataPartitionMap.put(DB_NAME, sgPartitionMap); + dataPartition.setDataPartitionMap(dataPartitionMap); + + return dataPartition; + } + + public static SchemaPartition constructSchemaPartition() { + SchemaPartition schemaPartition = + new SchemaPartition( + IoTDBDescriptor.getInstance().getConfig().getSeriesPartitionExecutorClass(), + IoTDBDescriptor.getInstance().getConfig().getSeriesPartitionSlotNum()); + Map<String, Map<TSeriesPartitionSlot, TRegionReplicaSet>> schemaPartitionMap = new HashMap<>(); + + TRegionReplicaSet schemaRegion1 = + new TRegionReplicaSet( + new TConsensusGroupId(TConsensusGroupType.SchemaRegion, 11), + Arrays.asList( + genDataNodeLocation(11, "192.0.1.1"), genDataNodeLocation(12, "192.0.1.2"))); + + TRegionReplicaSet schemaRegion2 = + new TRegionReplicaSet( + new TConsensusGroupId(TConsensusGroupType.SchemaRegion, 21), + Arrays.asList( + genDataNodeLocation(21, "192.0.2.1"), genDataNodeLocation(22, "192.0.2.2"))); + + Map<TSeriesPartitionSlot, TRegionReplicaSet> schemaRegionMap = new HashMap<>(); + schemaRegionMap.put(EXECUTOR.getSeriesPartitionSlot(device1), schemaRegion1); + schemaRegionMap.put(EXECUTOR.getSeriesPartitionSlot(device2), schemaRegion2); + schemaRegionMap.put(EXECUTOR.getSeriesPartitionSlot(device3), schemaRegion2); + schemaPartitionMap.put(DB_NAME, schemaRegionMap); + schemaPartition.setSchemaPartitionMap(schemaPartitionMap); + + return schemaPartition; + } + + private static TDataNodeLocation genDataNodeLocation(int dataNodeId, String ip) { + return new TDataNodeLocation() + .setDataNodeId(dataNodeId) + .setClientRpcEndPoint(new TEndPoint(ip, 9000)) + .setMPPDataExchangeEndPoint(new TEndPoint(ip, 9001)) + .setInternalEndPoint(new TEndPoint(ip, 9002)); + } +} diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/TestMatadata.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/TestMatadata.java index ce1bc0d0ce1..805fab985d0 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/TestMatadata.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/TestMatadata.java @@ -283,7 +283,9 @@ public class TestMatadata implements Metadata { List<Expression> expressionList, List<String> attributeColumns) { return Arrays.asList( - new DeviceEntry(new StringArrayDeviceID("t1", "t2", "t3"), Arrays.asList("a1", "a2"))); + new DeviceEntry( + new StringArrayDeviceID("root.testdb", "table1", "t1", "t2", "t3"), + Arrays.asList("a1", "a2"))); } public static boolean isTwoNumericType(List<? extends Type> argumentTypes) {
