This is an automated email from the ASF dual-hosted git repository. xiangweiwei pushed a commit to branch advancePipeline in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 44dbfc316935443432b874ee05c091f8d603704f Author: Alima777 <[email protected]> AuthorDate: Thu Feb 9 19:35:51 2023 +0800 implement consumeOneByOneChildren pipeline divided by dop --- .../iotdb/db/mpp/execution/driver/Driver.java | 15 +++- .../db/mpp/execution/driver/DriverContext.java | 9 ++ .../db/mpp/execution/schedule/DriverScheduler.java | 95 ++++++++++++++-------- .../db/mpp/execution/schedule/task/DriverTask.java | 11 +++ .../plan/planner/LocalExecutionPlanContext.java | 5 ++ .../db/mpp/plan/planner/OperatorTreeGenerator.java | 82 +++++++++++++++++-- .../db/mpp/plan/planner/PipelineDriverFactory.java | 11 ++- 7 files changed, 184 insertions(+), 44 deletions(-) diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/Driver.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/Driver.java index 14554bb8fb..a1df81be83 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/Driver.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/Driver.java @@ -29,11 +29,10 @@ import com.google.common.collect.ImmutableList; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.SettableFuture; import io.airlift.units.Duration; +import javax.annotation.concurrent.GuardedBy; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import javax.annotation.concurrent.GuardedBy; - import java.util.List; import java.util.Optional; import java.util.concurrent.TimeUnit; @@ -91,6 +90,10 @@ public abstract class Driver implements IDriver { return result.orElseGet(() -> state.get() != State.ALIVE || driverContext.isDone()); } + public DriverContext getDriverContext() { + return driverContext; + } + /** * do initialization * @@ -101,6 +104,14 @@ public abstract class Driver implements IDriver { /** release resource this driver used */ protected abstract void releaseResource(); + public boolean hasDependency() { + return driverContext.getDependencyDriverIndex() != -1; + } + + public int getDependencyDriverIndex() { + return driverContext.getDependencyDriverIndex(); + } + @Override public ListenableFuture<?> processFor(Duration duration) { diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/DriverContext.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/DriverContext.java index 95a5861978..ea369d8fad 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/DriverContext.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/DriverContext.java @@ -39,6 +39,7 @@ public class DriverContext { private final List<OperatorContext> operatorContexts = new ArrayList<>(); private ISinkHandle sinkHandle; private final RuleBasedTimeSliceAllocator timeSliceAllocator; + private int dependencyDriverIndex = -1; private final AtomicBoolean finished = new AtomicBoolean(); @@ -69,6 +70,14 @@ public class DriverContext { throw new UnsupportedOperationException(); } + public void setDependencyDriverIndex(int dependencyDriverIndex) { + this.dependencyDriverIndex = dependencyDriverIndex; + } + + public int getDependencyDriverIndex() { + return dependencyDriverIndex; + } + public void setSinkHandle(ISinkHandle sinkHandle) { this.sinkHandle = sinkHandle; } diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/DriverScheduler.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/DriverScheduler.java index 7188717d20..f959007648 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/DriverScheduler.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/DriverScheduler.java @@ -25,6 +25,7 @@ import org.apache.iotdb.commons.utils.TestOnly; import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.mpp.common.FragmentInstanceId; import org.apache.iotdb.db.mpp.common.QueryId; +import org.apache.iotdb.db.mpp.execution.driver.Driver; import org.apache.iotdb.db.mpp.execution.driver.IDriver; import org.apache.iotdb.db.mpp.execution.exchange.IMPPDataExchangeManager; import org.apache.iotdb.db.mpp.execution.exchange.MPPDataExchangeService; @@ -39,6 +40,8 @@ import org.apache.iotdb.db.mpp.metric.QueryMetricsManager; import org.apache.iotdb.db.utils.SetThreadName; import org.apache.iotdb.mpp.rpc.thrift.TFragmentInstanceId; +import com.google.common.util.concurrent.MoreExecutors; +import com.google.common.util.concurrent.SettableFuture; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -52,7 +55,6 @@ import java.util.OptionalInt; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicInteger; -import java.util.stream.Collectors; import static org.apache.iotdb.db.mpp.metric.DriverSchedulerMetricSet.BLOCK_QUEUED_TIME; import static org.apache.iotdb.db.mpp.metric.DriverSchedulerMetricSet.READY_QUEUED_TIME; @@ -175,43 +177,67 @@ public class DriverScheduler implements IDriverScheduler, IService { getNextDriverTaskHandleId(), (MultilevelPriorityQueue) readyQueue, OptionalInt.of(Integer.MAX_VALUE)); - List<DriverTask> tasks = - drivers.stream() - .map( - v -> - new DriverTask( - v, - timeOut > 0 ? timeOut : QUERY_TIMEOUT_MS, - DriverTaskStatus.READY, - driverTaskHandle)) - .collect(Collectors.toList()); + List<DriverTask> tasks = new ArrayList<>(); + drivers.forEach( + driver -> + tasks.add( + new DriverTask( + driver, + timeOut > 0 ? timeOut : QUERY_TIMEOUT_MS, + DriverTaskStatus.READY, + driverTaskHandle))); + + List<DriverTask> submittedTasks = new ArrayList<>(); + for (DriverTask task : tasks) { + Driver driver = (Driver) task.getDriver(); + if (driver.hasDependency()) { + SettableFuture<?> blockedDependencyFuture = + tasks.get(driver.getDependencyDriverIndex()).getBlockedDependencyDriver(); + blockedDependencyFuture.addListener( + () -> { + registerTaskToQueryMap(queryId, task); + submitTaskToReadyQueue(task); + }, + MoreExecutors.directExecutor()); + } else { + submittedTasks.add(task); + } + } + + for (DriverTask task : submittedTasks) { + registerTaskToQueryMap(queryId, task); + } + for (DriverTask task : submittedTasks) { + submitTaskToReadyQueue(task); + } + } + + public void registerTaskToQueryMap(QueryId queryId, DriverTask driverTask) { // If query has not been registered by other fragment instances, // add the first task as timeout checking task to timeoutQueue. - for (DriverTask driverTask : tasks) { - queryMap - .computeIfAbsent( - queryId, - v -> { - timeoutQueue.push(tasks.get(0)); - return new ConcurrentHashMap<>(); - }) - .computeIfAbsent( - driverTask.getDriverTaskId().getFragmentInstanceId(), - v -> Collections.synchronizedSet(new HashSet<>())) - .add(driverTask); - } + queryMap + .computeIfAbsent( + queryId, + v -> { + timeoutQueue.push(driverTask); + return new ConcurrentHashMap<>(); + }) + .computeIfAbsent( + driverTask.getDriverTaskId().getFragmentInstanceId(), + v -> Collections.synchronizedSet(new HashSet<>())) + .add(driverTask); + } - for (DriverTask task : tasks) { - task.lock(); - try { - if (task.getStatus() != DriverTaskStatus.READY) { - continue; - } - readyQueue.push(task); - task.setLastEnterReadyQueueTime(System.nanoTime()); - } finally { - task.unlock(); + public void submitTaskToReadyQueue(DriverTask task) { + task.lock(); + try { + if (task.getStatus() != DriverTaskStatus.READY) { + return; } + readyQueue.push(task); + task.setLastEnterReadyQueueTime(System.nanoTime()); + } finally { + task.unlock(); } } @@ -448,6 +474,7 @@ public class DriverScheduler implements IDriverScheduler, IService { task.updateSchedulePriority(context); task.setStatus(DriverTaskStatus.FINISHED); clearDriverTask(task); + task.submitDependencyDriver(); } finally { task.unlock(); } diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/task/DriverTask.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/task/DriverTask.java index e008b84210..b4c501a158 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/task/DriverTask.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/task/DriverTask.java @@ -31,6 +31,7 @@ import org.apache.iotdb.db.mpp.execution.schedule.queue.multilevelqueue.DriverTa import org.apache.iotdb.db.mpp.execution.schedule.queue.multilevelqueue.Priority; import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.SettableFuture; import io.airlift.units.Duration; import java.util.Comparator; @@ -58,6 +59,8 @@ public class DriverTask implements IDIndexedAccessible { private long lastEnterReadyQueueTime; private long lastEnterBlockQueueTime; + private SettableFuture<?> blockedDependencyDriver = SettableFuture.create(); + /** Initialize a dummy instance for queryHolder */ public DriverTask() { this(new StubFragmentInstance(), 0L, null, null); @@ -137,6 +140,14 @@ public class DriverTask implements IDIndexedAccessible { this.abortCause = abortCause; } + public void submitDependencyDriver() { + this.blockedDependencyDriver.set(null); + } + + public SettableFuture<?> getBlockedDependencyDriver() { + return blockedDependencyDriver; + } + public Priority getPriority() { return priority.get(); } diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanContext.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanContext.java index 32988da5ca..3132783e3e 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanContext.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanContext.java @@ -98,6 +98,7 @@ public class LocalExecutionPlanContext { this.dataRegionTTL = parentContext.dataRegionTTL; this.nextPipelineId = parentContext.nextPipelineId; this.pipelineDriverFactories = parentContext.pipelineDriverFactories; + this.degreeOfParallelism = parentContext.degreeOfParallelism; this.exchangeSumNum = parentContext.exchangeSumNum; this.exchangeOperatorList = parentContext.exchangeOperatorList; this.cachedDataTypes = parentContext.cachedDataTypes; @@ -139,6 +140,10 @@ public class LocalExecutionPlanContext { return pipelineDriverFactories; } + public int getPipelineNumber() { + return nextPipelineId.get(); + } + public DriverContext getDriverContext() { return driverContext; } diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java index ad2a5163ce..fd54e09b2c 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java @@ -2303,16 +2303,84 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP private List<Operator> dealWithConsumeChildrenOneByOneNode( PlanNode node, LocalExecutionPlanContext context) { + List<Operator> parentPipelineChildren = new ArrayList<>(); int originExchangeNum = context.getExchangeSumNum(); int finalExchangeNum = context.getExchangeSumNum(); - List<Operator> children = new ArrayList<>(); - for (PlanNode childSource : node.getChildren()) { - Operator childOperation = childSource.accept(this, context); - finalExchangeNum = Math.max(finalExchangeNum, context.getExchangeSumNum()); - context.setExchangeSumNum(originExchangeNum); - children.add(childOperation); + + // 1. divide every child to pipeline using the max dop + if (context.getDegreeOfParallelism() == 1) { + // If dop = 1, we don't create extra pipeline + for (PlanNode childSource : node.getChildren()) { + Operator childOperation = childSource.accept(this, context); + finalExchangeNum = Math.max(finalExchangeNum, context.getExchangeSumNum()); + context.setExchangeSumNum(originExchangeNum); + parentPipelineChildren.add(childOperation); + } + } else { + List<Integer> childPipelineNums = new ArrayList<>(); + int sumOfChildPipelines = 0; + int dependencyChildNode = 0, dependencyPipeId = 0; + for (PlanNode childNode : node.getChildren()) { + if (childNode instanceof ExchangeNode) { + Operator childOperation = childNode.accept(this, context); + finalExchangeNum = Math.max(finalExchangeNum, context.getExchangeSumNum()); + context.setExchangeSumNum(originExchangeNum); + parentPipelineChildren.add(childOperation); + } else { + LocalExecutionPlanContext subContext = context.createSubContext(); + // Only context.getDegreeOfParallelism() - 1 can be allocated to child + int dopForChild = context.getDegreeOfParallelism() - 1; + subContext.setDegreeOfParallelism(dopForChild); + int originPipeNum = context.getPipelineNumber(); + Operator childOperation = childNode.accept(this, subContext); + ISinkHandle localSinkHandle = + MPP_DATA_EXCHANGE_MANAGER.createLocalSinkHandleForPipeline( + // Attention, there is no parent node, use first child node instead + context.getDriverContext(), childNode.getPlanNodeId().getId()); + subContext.setSinkHandle(localSinkHandle); + subContext.addPipelineDriverFactory(childOperation, subContext.getDriverContext()); + + int curChildPipelineNum = subContext.getPipelineNumber() - originPipeNum; + childPipelineNums.add(curChildPipelineNum); + sumOfChildPipelines += curChildPipelineNum; + // If sumOfChildPipelines > dopForChild, we have to wait until some pipelines finish + if (sumOfChildPipelines > dopForChild) { + // Update dependencyPipeId, after which finishes we can submit curChildPipeline + while (sumOfChildPipelines > dopForChild) { + dependencyPipeId = context.getPipelineNumber() - sumOfChildPipelines; + sumOfChildPipelines -= childPipelineNums.get(dependencyChildNode); + dependencyChildNode++; + } + } + // Add dependency for all pipelines under current node + if (dependencyChildNode != 0) { + for (int i = originPipeNum; i < subContext.getPipelineNumber(); i++) { + context.getPipelineDriverFactories().get(i).setDependencyPipeline(dependencyPipeId); + } + } + + ExchangeOperator sourceOperator = + new ExchangeOperator( + context + .getDriverContext() + .addOperatorContext( + context.getNextOperatorId(), + null, + ExchangeOperator.class.getSimpleName()), + MPP_DATA_EXCHANGE_MANAGER.createLocalSourceHandleForPipeline( + ((LocalSinkHandle) localSinkHandle).getSharedTsBlockQueue(), + context.getDriverContext()), + childNode.getPlanNodeId()); + context + .getTimeSliceAllocator() + .recordExecutionWeight(sourceOperator.getOperatorContext(), 1); + parentPipelineChildren.add(sourceOperator); + context.addExchangeOperator(sourceOperator); + finalExchangeNum = Math.max(finalExchangeNum, subContext.getExchangeSumNum() + 1); + } + } } context.setExchangeSumNum(finalExchangeNum); - return children; + return parentPipelineChildren; } } diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/PipelineDriverFactory.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/PipelineDriverFactory.java index ec5eb7a3e2..fc21ed5c59 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/PipelineDriverFactory.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/PipelineDriverFactory.java @@ -31,6 +31,7 @@ public class PipelineDriverFactory { private final DriverContext driverContext; // TODO Use OperatorFactory to replace operator to generate multiple drivers for on pipeline private final Operator operation; + private int dependencyPipelineIndex = -1; public PipelineDriverFactory(Operator operation, DriverContext driverContext) { this.operation = requireNonNull(operation, "rootOperator is null"); @@ -44,7 +45,11 @@ public class PipelineDriverFactory { public Driver createDriver() { requireNonNull(driverContext, "driverContext is null"); try { - return new DataDriver(operation, driverContext); + Driver driver = new DataDriver(operation, driverContext); + if (dependencyPipelineIndex != -1) { + driver.getDriverContext().setDependencyDriverIndex(dependencyPipelineIndex); + } + return driver; } catch (Throwable failure) { try { operation.close(); @@ -56,4 +61,8 @@ public class PipelineDriverFactory { throw failure; } } + + public void setDependencyPipeline(int dependencyPipelineIndex) { + this.dependencyPipelineIndex = dependencyPipelineIndex; + } }
