This is an automated email from the ASF dual-hosted git repository. xiangweiwei pushed a commit to branch advancePipeline in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit fc55b20473a47058322c29ba4e92f970be289320 Author: Alima777 <[email protected]> AuthorDate: Thu Feb 9 22:34:33 2023 +0800 Fix schema pipeline bugs --- .../mpp/execution/fragment/FragmentInstanceManager.java | 14 +++++++++----- .../db/mpp/plan/planner/LocalExecutionPlanContext.java | 4 ++++ .../db/mpp/plan/planner/LocalExecutionPlanner.java | 17 ++++------------- .../db/mpp/plan/planner/OperatorTreeGenerator.java | 2 +- .../db/mpp/plan/planner/PipelineDriverFactory.java | 10 +++++++++- 5 files changed, 27 insertions(+), 20 deletions(-) diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceManager.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceManager.java index efa8db0bf1..794174b939 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceManager.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceManager.java @@ -25,7 +25,6 @@ import org.apache.iotdb.db.engine.storagegroup.IDataRegionForQuery; import org.apache.iotdb.db.metadata.schemaregion.ISchemaRegion; import org.apache.iotdb.db.mpp.common.FragmentInstanceId; import org.apache.iotdb.db.mpp.execution.driver.IDriver; -import org.apache.iotdb.db.mpp.execution.driver.SchemaDriver; import org.apache.iotdb.db.mpp.execution.exchange.ISinkHandle; import org.apache.iotdb.db.mpp.execution.schedule.DriverScheduler; import org.apache.iotdb.db.mpp.execution.schedule.IDriverScheduler; @@ -41,7 +40,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.ArrayList; -import java.util.Collections; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; @@ -195,14 +193,20 @@ public class FragmentInstanceManager { fragmentInstanceId, stateMachine, instance.getSessionInfo())); try { - SchemaDriver driver = + List<PipelineDriverFactory> driverFactories = planner.plan(instance.getFragment().getPlanNodeTree(), context, schemaRegion); + + List<IDriver> drivers = new ArrayList<>(); + driverFactories.forEach(factory -> drivers.add(factory.createDriver())); + // get the sinkHandle of last driver + ISinkHandle sinkHandle = drivers.get(drivers.size() - 1).getSinkHandle(); + return createFragmentInstanceExecution( scheduler, instanceId, context, - Collections.singletonList(driver), - driver.getSinkHandle(), + drivers, + sinkHandle, stateMachine, failedInstances, instance.getTimeOut()); diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanContext.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanContext.java index 3132783e3e..2926e6a261 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanContext.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanContext.java @@ -112,10 +112,14 @@ public class LocalExecutionPlanContext { this.allSensorsMap = new ConcurrentHashMap<>(); this.typeProvider = null; this.nextOperatorId = new AtomicInteger(0); + this.nextPipelineId = new AtomicInteger(0); // there is no ttl in schema region, so we don't care this field this.dataRegionTTL = Long.MAX_VALUE; this.driverContext = new SchemaDriverContext(instanceContext, schemaRegion); + this.pipelineDriverFactories = new ArrayList<>(); + // TODO combine with SchemaDriverContext + this.getNextPipelineId(); } public void addPipelineDriverFactory(Operator operation, DriverContext driverContext) { diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanner.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanner.java index 231ca1d680..91b3c5187f 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanner.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanner.java @@ -24,12 +24,9 @@ import org.apache.iotdb.db.exception.query.QueryProcessException; import org.apache.iotdb.db.metadata.schemaregion.ISchemaRegion; import org.apache.iotdb.db.mpp.exception.MemoryNotEnoughException; import org.apache.iotdb.db.mpp.execution.driver.DataDriverContext; -import org.apache.iotdb.db.mpp.execution.driver.SchemaDriver; -import org.apache.iotdb.db.mpp.execution.driver.SchemaDriverContext; import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceContext; import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceStateMachine; import org.apache.iotdb.db.mpp.execution.operator.Operator; -import org.apache.iotdb.db.mpp.execution.timer.ITimeSliceAllocator; import org.apache.iotdb.db.mpp.plan.analyze.TypeProvider; import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode; import org.apache.iotdb.db.utils.SetThreadName; @@ -81,7 +78,7 @@ public class LocalExecutionPlanner { return context.getPipelineDriverFactories(); } - public SchemaDriver plan( + public List<PipelineDriverFactory> plan( PlanNode plan, FragmentInstanceContext instanceContext, ISchemaRegion schemaRegion) throws MemoryNotEnoughException { LocalExecutionPlanContext context = @@ -92,18 +89,12 @@ public class LocalExecutionPlanner { // check whether current free memory is enough to execute current query checkMemory(root, instanceContext.getStateMachine()); + context.addPipelineDriverFactory(root, context.getDriverContext()); + // set maxBytes one SourceHandle can reserve after visiting the whole tree context.setMaxBytesOneHandleCanReserve(); - ITimeSliceAllocator timeSliceAllocator = context.getTimeSliceAllocator(); - context - .getDriverContext() - .getOperatorContexts() - .forEach( - operatorContext -> - operatorContext.setMaxRunTime(timeSliceAllocator.getMaxRunTime(operatorContext))); - - return new SchemaDriver(root, (SchemaDriverContext) context.getDriverContext()); + return context.getPipelineDriverFactories(); } private void checkMemory(Operator root, FragmentInstanceStateMachine stateMachine) diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java index fd54e09b2c..845cb2be29 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java @@ -2243,7 +2243,7 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP int maxDop = Math.min(context.getDegreeOfParallelism(), localChildren.size() + 1); int dopForChild = Math.max(1, context.getDegreeOfParallelism() - localChildren.size()); - for (int i = 0; i < maxDop; i++) { + for (int i = 0; i < maxDop && i < localChildren.size(); i++) { // Only if dop >= size(children) + 1, split all children to new pipeline // Otherwise, the first group but not last will belong to the parent pipeline since the // children number of last group is greaterEqual than the first group diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/PipelineDriverFactory.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/PipelineDriverFactory.java index fc21ed5c59..e71107b7af 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/PipelineDriverFactory.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/PipelineDriverFactory.java @@ -20,8 +20,11 @@ package org.apache.iotdb.db.mpp.plan.planner; import org.apache.iotdb.db.mpp.execution.driver.DataDriver; +import org.apache.iotdb.db.mpp.execution.driver.DataDriverContext; import org.apache.iotdb.db.mpp.execution.driver.Driver; import org.apache.iotdb.db.mpp.execution.driver.DriverContext; +import org.apache.iotdb.db.mpp.execution.driver.SchemaDriver; +import org.apache.iotdb.db.mpp.execution.driver.SchemaDriverContext; import org.apache.iotdb.db.mpp.execution.operator.Operator; import static java.util.Objects.requireNonNull; @@ -45,7 +48,12 @@ public class PipelineDriverFactory { public Driver createDriver() { requireNonNull(driverContext, "driverContext is null"); try { - Driver driver = new DataDriver(operation, driverContext); + Driver driver = null; + if (driverContext instanceof DataDriverContext) { + driver = new DataDriver(operation, driverContext); + } else { + driver = new SchemaDriver(operation, (SchemaDriverContext) driverContext); + } if (dependencyPipelineIndex != -1) { driver.getDriverContext().setDependencyDriverIndex(dependencyPipelineIndex); }
