This is an automated email from the ASF dual-hosted git repository.
caogaofei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 1b33c55153a Change LogicalOptimizer and DistributionOptimizer
singleton (#14158)
1b33c55153a is described below
commit 1b33c55153a401b16ae475cc0533e683906b4d16
Author: Jackie Tien <[email protected]>
AuthorDate: Thu Nov 21 15:27:53 2024 +0800
Change LogicalOptimizer and DistributionOptimizer singleton (#14158)
---
.../iotdb/db/queryengine/plan/Coordinator.java | 35 ++++++++++++++++++++--
.../TableModelStatementMemorySourceVisitor.java | 7 +++--
.../plan/planner/TableOperatorGenerator.java | 3 +-
.../relational/planner/TableLogicalPlanner.java | 16 +++++-----
.../plan/relational/planner/TableModelPlanner.java | 19 ++++++++++--
.../distribute/TableDistributedPlanner.java | 21 +++++++++++--
6 files changed, 81 insertions(+), 20 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/Coordinator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/Coordinator.java
index 739c1e343ef..61be1471b15 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/Coordinator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/Coordinator.java
@@ -44,9 +44,14 @@ import
org.apache.iotdb.db.queryengine.plan.execution.QueryExecution;
import org.apache.iotdb.db.queryengine.plan.execution.config.ConfigExecution;
import
org.apache.iotdb.db.queryengine.plan.execution.config.TableConfigTaskVisitor;
import
org.apache.iotdb.db.queryengine.plan.execution.config.TreeConfigTaskVisitor;
+import org.apache.iotdb.db.queryengine.plan.planner.LocalExecutionPlanner;
import org.apache.iotdb.db.queryengine.plan.planner.TreeModelPlanner;
import org.apache.iotdb.db.queryengine.plan.relational.metadata.Metadata;
+import org.apache.iotdb.db.queryengine.plan.relational.planner.PlannerContext;
import
org.apache.iotdb.db.queryengine.plan.relational.planner.TableModelPlanner;
+import
org.apache.iotdb.db.queryengine.plan.relational.planner.optimizations.DistributedOptimizeFactory;
+import
org.apache.iotdb.db.queryengine.plan.relational.planner.optimizations.LogicalOptimizeFactory;
+import
org.apache.iotdb.db.queryengine.plan.relational.planner.optimizations.PlanOptimizer;
import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.AddColumn;
import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.ClearCache;
import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.CreateDB;
@@ -77,6 +82,7 @@ import
org.apache.iotdb.db.queryengine.plan.relational.sql.ast.ShowVersion;
import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Use;
import
org.apache.iotdb.db.queryengine.plan.relational.sql.ast.WrappedInsertStatement;
import org.apache.iotdb.db.queryengine.plan.relational.sql.parser.SqlParser;
+import
org.apache.iotdb.db.queryengine.plan.relational.type.InternalTypeManager;
import org.apache.iotdb.db.queryengine.plan.statement.IConfigStatement;
import org.apache.iotdb.db.queryengine.plan.statement.Statement;
import org.apache.iotdb.db.utils.SetThreadName;
@@ -131,11 +137,24 @@ public class Coordinator {
private final ConcurrentHashMap<Long, IQueryExecution> queryExecutionMap;
+ private final List<PlanOptimizer> logicalPlanOptimizers;
+ private final List<PlanOptimizer> distributionPlanOptimizers;
+
private Coordinator() {
this.queryExecutionMap = new ConcurrentHashMap<>();
this.executor = getQueryExecutor();
this.writeOperationExecutor = getWriteExecutor();
this.scheduledExecutor = getScheduledExecutor();
+ this.logicalPlanOptimizers =
+ new LogicalOptimizeFactory(
+ new PlannerContext(
+ LocalExecutionPlanner.getInstance().metadata, new
InternalTypeManager()))
+ .getPlanOptimizers();
+ this.distributionPlanOptimizers =
+ new DistributedOptimizeFactory(
+ new PlannerContext(
+ LocalExecutionPlanner.getInstance().metadata, new
InternalTypeManager()))
+ .getPlanOptimizers();
}
private ExecutionResult execution(
@@ -312,7 +331,9 @@ public class Coordinator {
writeOperationExecutor,
scheduledExecutor,
SYNC_INTERNAL_SERVICE_CLIENT_MANAGER,
- ASYNC_INTERNAL_SERVICE_CLIENT_MANAGER);
+ ASYNC_INTERNAL_SERVICE_CLIENT_MANAGER,
+ logicalPlanOptimizers,
+ distributionPlanOptimizers);
return new QueryExecution(tableModelPlanner, queryContext, executor);
}
@@ -373,7 +394,9 @@ public class Coordinator {
writeOperationExecutor,
scheduledExecutor,
SYNC_INTERNAL_SERVICE_CLIENT_MANAGER,
- ASYNC_INTERNAL_SERVICE_CLIENT_MANAGER);
+ ASYNC_INTERNAL_SERVICE_CLIENT_MANAGER,
+ logicalPlanOptimizers,
+ distributionPlanOptimizers);
return new QueryExecution(tableModelPlanner, queryContext, executor);
}
@@ -461,4 +484,12 @@ public class Coordinator {
}
return -1L;
}
+
+ public List<PlanOptimizer> getDistributionPlanOptimizers() {
+ return distributionPlanOptimizers;
+ }
+
+ public List<PlanOptimizer> getLogicalPlanOptimizers() {
+ return logicalPlanOptimizers;
+ }
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/memory/TableModelStatementMemorySourceVisitor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/memory/TableModelStatementMemorySourceVisitor.java
index d4c5c19a426..70f48ce3b25 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/memory/TableModelStatementMemorySourceVisitor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/memory/TableModelStatementMemorySourceVisitor.java
@@ -22,6 +22,7 @@ package org.apache.iotdb.db.queryengine.plan.execution.memory;
import org.apache.iotdb.commons.conf.IoTDBConstant;
import org.apache.iotdb.db.queryengine.common.header.ColumnHeader;
import org.apache.iotdb.db.queryengine.common.header.DatasetHeader;
+import org.apache.iotdb.db.queryengine.plan.Coordinator;
import org.apache.iotdb.db.queryengine.plan.planner.LocalExecutionPlanner;
import org.apache.iotdb.db.queryengine.plan.planner.plan.LogicalQueryPlan;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanGraphPrinter;
@@ -73,7 +74,8 @@ public class TableModelStatementMemorySourceVisitor
LocalExecutionPlanner.getInstance().metadata,
context.getQueryContext().getSession(),
symbolAllocator,
- NOOP)
+ NOOP,
+ Coordinator.getInstance().getLogicalPlanOptimizers())
.plan(context.getAnalysis());
// if (context.getAnalysis().isEmptyDataSource()) {
// return new StatementMemorySource(new TsBlock(0), header);
@@ -87,7 +89,8 @@ public class TableModelStatementMemorySourceVisitor
context.getAnalysis(),
symbolAllocator,
logicalPlan,
- LocalExecutionPlanner.getInstance().metadata)
+ LocalExecutionPlanner.getInstance().metadata,
+ Coordinator.getInstance().getDistributionPlanOptimizers())
.generateDistributedPlanWithOptimize(planContext);
final List<String> lines =
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java
index d55417013b0..fe0e8671f4a 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java
@@ -66,7 +66,6 @@ import
org.apache.iotdb.db.queryengine.execution.operator.schema.SchemaQueryScan
import
org.apache.iotdb.db.queryengine.execution.operator.schema.source.DevicePredicateFilter;
import
org.apache.iotdb.db.queryengine.execution.operator.schema.source.SchemaSourceFactory;
import
org.apache.iotdb.db.queryengine.execution.operator.sink.IdentitySinkOperator;
-import
org.apache.iotdb.db.queryengine.execution.operator.source.AlignedSeriesScanOperator;
import
org.apache.iotdb.db.queryengine.execution.operator.source.ExchangeOperator;
import
org.apache.iotdb.db.queryengine.execution.operator.source.relational.TableAggregationTableScanOperator;
import
org.apache.iotdb.db.queryengine.execution.operator.source.relational.TableFullOuterJoinOperator;
@@ -384,7 +383,7 @@ public class TableOperatorGenerator extends
PlanVisitor<Operator, LocalExecution
.addOperatorContext(
context.getNextOperatorId(),
node.getPlanNodeId(),
- AlignedSeriesScanOperator.class.getSimpleName());
+ TableScanNode.class.getSimpleName());
int maxTsBlockLineNum =
TSFileDescriptor.getInstance().getConfig().getMaxTsBlockLineNumber();
if (context.getTypeProvider().getTemplatedInfo() != null) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/TableLogicalPlanner.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/TableLogicalPlanner.java
index 4d11975aab9..2692690c82f 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/TableLogicalPlanner.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/TableLogicalPlanner.java
@@ -86,23 +86,23 @@ public class TableLogicalPlanner {
private final Metadata metadata;
private final WarningCollector warningCollector;
+ @TestOnly
public TableLogicalPlanner(
MPPQueryContext queryContext,
Metadata metadata,
SessionInfo sessionInfo,
SymbolAllocator symbolAllocator,
WarningCollector warningCollector) {
- this.queryContext = queryContext;
- this.metadata = metadata;
- this.sessionInfo = requireNonNull(sessionInfo, "session is null");
- this.symbolAllocator = requireNonNull(symbolAllocator, "symbolAllocator is
null");
- this.warningCollector = requireNonNull(warningCollector, "warningCollector
is null");
- this.planOptimizers =
+ this(
+ queryContext,
+ metadata,
+ sessionInfo,
+ symbolAllocator,
+ warningCollector,
new LogicalOptimizeFactory(new PlannerContext(metadata, new
InternalTypeManager()))
- .getPlanOptimizers();
+ .getPlanOptimizers());
}
- @TestOnly
public TableLogicalPlanner(
MPPQueryContext queryContext,
Metadata metadata,
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/TableModelPlanner.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/TableModelPlanner.java
index b316d56f2a5..a7f0878041e 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/TableModelPlanner.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/TableModelPlanner.java
@@ -37,6 +37,7 @@ import
org.apache.iotdb.db.queryengine.plan.relational.analyzer.Analyzer;
import
org.apache.iotdb.db.queryengine.plan.relational.analyzer.StatementAnalyzerFactory;
import org.apache.iotdb.db.queryengine.plan.relational.metadata.Metadata;
import
org.apache.iotdb.db.queryengine.plan.relational.planner.distribute.TableDistributedPlanner;
+import
org.apache.iotdb.db.queryengine.plan.relational.planner.optimizations.PlanOptimizer;
import org.apache.iotdb.db.queryengine.plan.relational.security.AccessControl;
import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.LoadTsFile;
import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.PipeEnriched;
@@ -63,6 +64,8 @@ public class TableModelPlanner implements IPlanner {
private final SqlParser sqlParser;
private final Metadata metadata;
+ private final List<PlanOptimizer> logicalPlanOptimizers;
+ private final List<PlanOptimizer> distributionPlanOptimizers;
private final SymbolAllocator symbolAllocator = new SymbolAllocator();
// TODO access control
@@ -90,7 +93,9 @@ public class TableModelPlanner implements IPlanner {
final IClientManager<TEndPoint, SyncDataNodeInternalServiceClient>
syncInternalServiceClientManager,
final IClientManager<TEndPoint, AsyncDataNodeInternalServiceClient>
- asyncInternalServiceClientManager) {
+ asyncInternalServiceClientManager,
+ final List<PlanOptimizer> logicalPlanOptimizers,
+ final List<PlanOptimizer> distributionPlanOptimizers) {
this.statement = statement;
this.sqlParser = sqlParser;
this.metadata = metadata;
@@ -99,6 +104,8 @@ public class TableModelPlanner implements IPlanner {
this.scheduledExecutor = scheduledExecutor;
this.syncInternalServiceClientManager = syncInternalServiceClientManager;
this.asyncInternalServiceClientManager = asyncInternalServiceClientManager;
+ this.logicalPlanOptimizers = logicalPlanOptimizers;
+ this.distributionPlanOptimizers = distributionPlanOptimizers;
}
@Override
@@ -116,14 +123,20 @@ public class TableModelPlanner implements IPlanner {
@Override
public LogicalQueryPlan doLogicalPlan(final IAnalysis analysis, final
MPPQueryContext context) {
return new TableLogicalPlanner(
- context, metadata, context.getSession(), symbolAllocator,
warningCollector)
+ context,
+ metadata,
+ context.getSession(),
+ symbolAllocator,
+ warningCollector,
+ logicalPlanOptimizers)
.plan((Analysis) analysis);
}
@Override
public DistributedQueryPlan doDistributionPlan(
final IAnalysis analysis, final LogicalQueryPlan logicalPlan) {
- return new TableDistributedPlanner((Analysis) analysis, symbolAllocator,
logicalPlan, metadata)
+ return new TableDistributedPlanner(
+ (Analysis) analysis, symbolAllocator, logicalPlan, metadata,
distributionPlanOptimizers)
.plan();
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributedPlanner.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributedPlanner.java
index 934f4ec2b15..a485f3814f4 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributedPlanner.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributedPlanner.java
@@ -14,6 +14,7 @@
package org.apache.iotdb.db.queryengine.plan.relational.planner.distribute;
import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
+import org.apache.iotdb.commons.utils.TestOnly;
import org.apache.iotdb.db.queryengine.common.MPPQueryContext;
import
org.apache.iotdb.db.queryengine.execution.exchange.sink.DownStreamChannelLocation;
import org.apache.iotdb.db.queryengine.metric.QueryPlanCostMetricSet;
@@ -58,18 +59,32 @@ public class TableDistributedPlanner {
private final List<PlanOptimizer> optimizers;
private final Metadata metadata;
+ @TestOnly
public TableDistributedPlanner(
Analysis analysis,
SymbolAllocator symbolAllocator,
LogicalQueryPlan logicalQueryPlan,
Metadata metadata) {
+ this(
+ analysis,
+ symbolAllocator,
+ logicalQueryPlan,
+ metadata,
+ new DistributedOptimizeFactory(new PlannerContext(metadata, new
InternalTypeManager()))
+ .getPlanOptimizers());
+ }
+
+ public TableDistributedPlanner(
+ Analysis analysis,
+ SymbolAllocator symbolAllocator,
+ LogicalQueryPlan logicalQueryPlan,
+ Metadata metadata,
+ List<PlanOptimizer> distributedOptimizers) {
this.analysis = analysis;
this.symbolAllocator = requireNonNull(symbolAllocator, "symbolAllocator is
null");
this.logicalQueryPlan = logicalQueryPlan;
this.mppQueryContext = logicalQueryPlan.getContext();
- this.optimizers =
- new DistributedOptimizeFactory(new PlannerContext(metadata, new
InternalTypeManager()))
- .getPlanOptimizers();
+ this.optimizers = distributedOptimizers;
this.metadata = metadata;
}