This is an automated email from the ASF dual-hosted git repository. jackietien pushed a commit to branch QueryOpt in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 512879e053fd3e690ef9984cc50ea9874a0b0fd7 Author: JackieTien97 <[email protected]> AuthorDate: Thu Nov 21 12:13:12 2024 +0800 Change LogicalOptimizer and DistributionOptimizer singleton --- .../iotdb/db/queryengine/plan/Coordinator.java | 35 ++++++++++++++++++++-- .../TableModelStatementMemorySourceVisitor.java | 7 +++-- .../plan/planner/TableOperatorGenerator.java | 3 +- .../relational/planner/TableLogicalPlanner.java | 16 +++++----- .../plan/relational/planner/TableModelPlanner.java | 19 ++++++++++-- .../distribute/TableDistributedPlanner.java | 21 +++++++++++-- 6 files changed, 81 insertions(+), 20 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/Coordinator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/Coordinator.java index 739c1e343ef..61be1471b15 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/Coordinator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/Coordinator.java @@ -44,9 +44,14 @@ import org.apache.iotdb.db.queryengine.plan.execution.QueryExecution; import org.apache.iotdb.db.queryengine.plan.execution.config.ConfigExecution; import org.apache.iotdb.db.queryengine.plan.execution.config.TableConfigTaskVisitor; import org.apache.iotdb.db.queryengine.plan.execution.config.TreeConfigTaskVisitor; +import org.apache.iotdb.db.queryengine.plan.planner.LocalExecutionPlanner; import org.apache.iotdb.db.queryengine.plan.planner.TreeModelPlanner; import org.apache.iotdb.db.queryengine.plan.relational.metadata.Metadata; +import org.apache.iotdb.db.queryengine.plan.relational.planner.PlannerContext; import org.apache.iotdb.db.queryengine.plan.relational.planner.TableModelPlanner; +import org.apache.iotdb.db.queryengine.plan.relational.planner.optimizations.DistributedOptimizeFactory; +import org.apache.iotdb.db.queryengine.plan.relational.planner.optimizations.LogicalOptimizeFactory; +import org.apache.iotdb.db.queryengine.plan.relational.planner.optimizations.PlanOptimizer; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.AddColumn; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.ClearCache; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.CreateDB; @@ -77,6 +82,7 @@ import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.ShowVersion; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Use; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.WrappedInsertStatement; import org.apache.iotdb.db.queryengine.plan.relational.sql.parser.SqlParser; +import org.apache.iotdb.db.queryengine.plan.relational.type.InternalTypeManager; import org.apache.iotdb.db.queryengine.plan.statement.IConfigStatement; import org.apache.iotdb.db.queryengine.plan.statement.Statement; import org.apache.iotdb.db.utils.SetThreadName; @@ -131,11 +137,24 @@ public class Coordinator { private final ConcurrentHashMap<Long, IQueryExecution> queryExecutionMap; + private final List<PlanOptimizer> logicalPlanOptimizers; + private final List<PlanOptimizer> distributionPlanOptimizers; + private Coordinator() { this.queryExecutionMap = new ConcurrentHashMap<>(); this.executor = getQueryExecutor(); this.writeOperationExecutor = getWriteExecutor(); this.scheduledExecutor = getScheduledExecutor(); + this.logicalPlanOptimizers = + new LogicalOptimizeFactory( + new PlannerContext( + LocalExecutionPlanner.getInstance().metadata, new InternalTypeManager())) + .getPlanOptimizers(); + this.distributionPlanOptimizers = + new DistributedOptimizeFactory( + new PlannerContext( + LocalExecutionPlanner.getInstance().metadata, new InternalTypeManager())) + .getPlanOptimizers(); } private ExecutionResult execution( @@ -312,7 +331,9 @@ public class Coordinator { writeOperationExecutor, scheduledExecutor, SYNC_INTERNAL_SERVICE_CLIENT_MANAGER, - ASYNC_INTERNAL_SERVICE_CLIENT_MANAGER); + ASYNC_INTERNAL_SERVICE_CLIENT_MANAGER, + logicalPlanOptimizers, + distributionPlanOptimizers); return new QueryExecution(tableModelPlanner, queryContext, executor); } @@ -373,7 +394,9 @@ public class Coordinator { writeOperationExecutor, scheduledExecutor, SYNC_INTERNAL_SERVICE_CLIENT_MANAGER, - ASYNC_INTERNAL_SERVICE_CLIENT_MANAGER); + ASYNC_INTERNAL_SERVICE_CLIENT_MANAGER, + logicalPlanOptimizers, + distributionPlanOptimizers); return new QueryExecution(tableModelPlanner, queryContext, executor); } @@ -461,4 +484,12 @@ public class Coordinator { } return -1L; } + + public List<PlanOptimizer> getDistributionPlanOptimizers() { + return distributionPlanOptimizers; + } + + public List<PlanOptimizer> getLogicalPlanOptimizers() { + return logicalPlanOptimizers; + } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/memory/TableModelStatementMemorySourceVisitor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/memory/TableModelStatementMemorySourceVisitor.java index d4c5c19a426..70f48ce3b25 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/memory/TableModelStatementMemorySourceVisitor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/memory/TableModelStatementMemorySourceVisitor.java @@ -22,6 +22,7 @@ package org.apache.iotdb.db.queryengine.plan.execution.memory; import org.apache.iotdb.commons.conf.IoTDBConstant; import org.apache.iotdb.db.queryengine.common.header.ColumnHeader; import org.apache.iotdb.db.queryengine.common.header.DatasetHeader; +import org.apache.iotdb.db.queryengine.plan.Coordinator; import org.apache.iotdb.db.queryengine.plan.planner.LocalExecutionPlanner; import org.apache.iotdb.db.queryengine.plan.planner.plan.LogicalQueryPlan; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanGraphPrinter; @@ -73,7 +74,8 @@ public class TableModelStatementMemorySourceVisitor LocalExecutionPlanner.getInstance().metadata, context.getQueryContext().getSession(), symbolAllocator, - NOOP) + NOOP, + Coordinator.getInstance().getLogicalPlanOptimizers()) .plan(context.getAnalysis()); // if (context.getAnalysis().isEmptyDataSource()) { // return new StatementMemorySource(new TsBlock(0), header); @@ -87,7 +89,8 @@ public class TableModelStatementMemorySourceVisitor context.getAnalysis(), symbolAllocator, logicalPlan, - LocalExecutionPlanner.getInstance().metadata) + LocalExecutionPlanner.getInstance().metadata, + Coordinator.getInstance().getDistributionPlanOptimizers()) .generateDistributedPlanWithOptimize(planContext); final List<String> lines = diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java index d55417013b0..fe0e8671f4a 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java @@ -66,7 +66,6 @@ import org.apache.iotdb.db.queryengine.execution.operator.schema.SchemaQueryScan import org.apache.iotdb.db.queryengine.execution.operator.schema.source.DevicePredicateFilter; import org.apache.iotdb.db.queryengine.execution.operator.schema.source.SchemaSourceFactory; import org.apache.iotdb.db.queryengine.execution.operator.sink.IdentitySinkOperator; -import org.apache.iotdb.db.queryengine.execution.operator.source.AlignedSeriesScanOperator; import org.apache.iotdb.db.queryengine.execution.operator.source.ExchangeOperator; import org.apache.iotdb.db.queryengine.execution.operator.source.relational.TableAggregationTableScanOperator; import org.apache.iotdb.db.queryengine.execution.operator.source.relational.TableFullOuterJoinOperator; @@ -384,7 +383,7 @@ public class TableOperatorGenerator extends PlanVisitor<Operator, LocalExecution .addOperatorContext( context.getNextOperatorId(), node.getPlanNodeId(), - AlignedSeriesScanOperator.class.getSimpleName()); + TableScanNode.class.getSimpleName()); int maxTsBlockLineNum = TSFileDescriptor.getInstance().getConfig().getMaxTsBlockLineNumber(); if (context.getTypeProvider().getTemplatedInfo() != null) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/TableLogicalPlanner.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/TableLogicalPlanner.java index 238fd0a9051..300f998dc4c 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/TableLogicalPlanner.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/TableLogicalPlanner.java @@ -86,23 +86,23 @@ public class TableLogicalPlanner { private final Metadata metadata; private final WarningCollector warningCollector; + @TestOnly public TableLogicalPlanner( MPPQueryContext queryContext, Metadata metadata, SessionInfo sessionInfo, SymbolAllocator symbolAllocator, WarningCollector warningCollector) { - this.queryContext = queryContext; - this.metadata = metadata; - this.sessionInfo = requireNonNull(sessionInfo, "session is null"); - this.symbolAllocator = requireNonNull(symbolAllocator, "symbolAllocator is null"); - this.warningCollector = requireNonNull(warningCollector, "warningCollector is null"); - this.planOptimizers = + this( + queryContext, + metadata, + sessionInfo, + symbolAllocator, + warningCollector, new LogicalOptimizeFactory(new PlannerContext(metadata, new InternalTypeManager())) - .getPlanOptimizers(); + .getPlanOptimizers()); } - @TestOnly public TableLogicalPlanner( MPPQueryContext queryContext, Metadata metadata, diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/TableModelPlanner.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/TableModelPlanner.java index b316d56f2a5..a7f0878041e 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/TableModelPlanner.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/TableModelPlanner.java @@ -37,6 +37,7 @@ import org.apache.iotdb.db.queryengine.plan.relational.analyzer.Analyzer; import org.apache.iotdb.db.queryengine.plan.relational.analyzer.StatementAnalyzerFactory; import org.apache.iotdb.db.queryengine.plan.relational.metadata.Metadata; import org.apache.iotdb.db.queryengine.plan.relational.planner.distribute.TableDistributedPlanner; +import org.apache.iotdb.db.queryengine.plan.relational.planner.optimizations.PlanOptimizer; import org.apache.iotdb.db.queryengine.plan.relational.security.AccessControl; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.LoadTsFile; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.PipeEnriched; @@ -63,6 +64,8 @@ public class TableModelPlanner implements IPlanner { private final SqlParser sqlParser; private final Metadata metadata; + private final List<PlanOptimizer> logicalPlanOptimizers; + private final List<PlanOptimizer> distributionPlanOptimizers; private final SymbolAllocator symbolAllocator = new SymbolAllocator(); // TODO access control @@ -90,7 +93,9 @@ public class TableModelPlanner implements IPlanner { final IClientManager<TEndPoint, SyncDataNodeInternalServiceClient> syncInternalServiceClientManager, final IClientManager<TEndPoint, AsyncDataNodeInternalServiceClient> - asyncInternalServiceClientManager) { + asyncInternalServiceClientManager, + final List<PlanOptimizer> logicalPlanOptimizers, + final List<PlanOptimizer> distributionPlanOptimizers) { this.statement = statement; this.sqlParser = sqlParser; this.metadata = metadata; @@ -99,6 +104,8 @@ public class TableModelPlanner implements IPlanner { this.scheduledExecutor = scheduledExecutor; this.syncInternalServiceClientManager = syncInternalServiceClientManager; this.asyncInternalServiceClientManager = asyncInternalServiceClientManager; + this.logicalPlanOptimizers = logicalPlanOptimizers; + this.distributionPlanOptimizers = distributionPlanOptimizers; } @Override @@ -116,14 +123,20 @@ public class TableModelPlanner implements IPlanner { @Override public LogicalQueryPlan doLogicalPlan(final IAnalysis analysis, final MPPQueryContext context) { return new TableLogicalPlanner( - context, metadata, context.getSession(), symbolAllocator, warningCollector) + context, + metadata, + context.getSession(), + symbolAllocator, + warningCollector, + logicalPlanOptimizers) .plan((Analysis) analysis); } @Override public DistributedQueryPlan doDistributionPlan( final IAnalysis analysis, final LogicalQueryPlan logicalPlan) { - return new TableDistributedPlanner((Analysis) analysis, symbolAllocator, logicalPlan, metadata) + return new TableDistributedPlanner( + (Analysis) analysis, symbolAllocator, logicalPlan, metadata, distributionPlanOptimizers) .plan(); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributedPlanner.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributedPlanner.java index 934f4ec2b15..a485f3814f4 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributedPlanner.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributedPlanner.java @@ -14,6 +14,7 @@ package org.apache.iotdb.db.queryengine.plan.relational.planner.distribute; import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet; +import org.apache.iotdb.commons.utils.TestOnly; import org.apache.iotdb.db.queryengine.common.MPPQueryContext; import org.apache.iotdb.db.queryengine.execution.exchange.sink.DownStreamChannelLocation; import org.apache.iotdb.db.queryengine.metric.QueryPlanCostMetricSet; @@ -58,18 +59,32 @@ public class TableDistributedPlanner { private final List<PlanOptimizer> optimizers; private final Metadata metadata; + @TestOnly public TableDistributedPlanner( Analysis analysis, SymbolAllocator symbolAllocator, LogicalQueryPlan logicalQueryPlan, Metadata metadata) { + this( + analysis, + symbolAllocator, + logicalQueryPlan, + metadata, + new DistributedOptimizeFactory(new PlannerContext(metadata, new InternalTypeManager())) + .getPlanOptimizers()); + } + + public TableDistributedPlanner( + Analysis analysis, + SymbolAllocator symbolAllocator, + LogicalQueryPlan logicalQueryPlan, + Metadata metadata, + List<PlanOptimizer> distributedOptimizers) { this.analysis = analysis; this.symbolAllocator = requireNonNull(symbolAllocator, "symbolAllocator is null"); this.logicalQueryPlan = logicalQueryPlan; this.mppQueryContext = logicalQueryPlan.getContext(); - this.optimizers = - new DistributedOptimizeFactory(new PlannerContext(metadata, new InternalTypeManager())) - .getPlanOptimizers(); + this.optimizers = distributedOptimizers; this.metadata = metadata; }
