This is an automated email from the ASF dual-hosted git repository. xiangweiwei pushed a commit to branch registerLazy in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 47033d5231a1c27ac9bfc0740875507ec5ef83f8 Author: Alima777 <[email protected]> AuthorDate: Thu Apr 6 15:56:35 2023 +0800 Only when the upstream operator is closed, then dependency driver can be submitted --- .../db/mpp/execution/driver/DriverContext.java | 18 ++++++++++++++++ .../iotdb/db/mpp/execution/driver/IDriver.java | 2 +- .../mpp/execution/exchange/SharedTsBlockQueue.java | 25 ++++++++++++++++++---- .../operator/source/ExchangeOperator.java | 13 +++++++++++ .../db/mpp/execution/schedule/DriverScheduler.java | 7 +++--- .../db/mpp/execution/schedule/task/DriverTask.java | 18 ++++------------ .../plan/planner/LocalExecutionPlanContext.java | 4 ++++ .../db/mpp/plan/planner/OperatorTreeGenerator.java | 3 ++- .../db/mpp/plan/planner/PipelineDriverFactory.java | 15 +++++++------ .../execution/schedule/DriverSchedulerTest.java | 9 ++++---- 10 files changed, 79 insertions(+), 35 deletions(-) diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/DriverContext.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/DriverContext.java index 58a131545f..15c6dcc232 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/DriverContext.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/DriverContext.java @@ -18,9 +18,11 @@ */ package org.apache.iotdb.db.mpp.execution.driver; +import org.apache.iotdb.commons.utils.TestOnly; import org.apache.iotdb.db.mpp.execution.exchange.sink.ISink; import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceContext; import org.apache.iotdb.db.mpp.execution.operator.OperatorContext; +import org.apache.iotdb.db.mpp.execution.operator.source.ExchangeOperator; import org.apache.iotdb.db.mpp.execution.schedule.task.DriverTaskId; import org.apache.iotdb.db.mpp.execution.timer.RuleBasedTimeSliceAllocator; import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId; @@ -39,10 +41,18 @@ public class DriverContext { private final List<OperatorContext> operatorContexts = new ArrayList<>(); private ISink sink; private final RuleBasedTimeSliceAllocator timeSliceAllocator; + private int dependencyDriverIndex = -1; + private ExchangeOperator upstreamOperator; private final AtomicBoolean finished = new AtomicBoolean(); + @TestOnly + public DriverContext() { + this.fragmentInstanceContext = null; + this.timeSliceAllocator = null; + } + public DriverContext(FragmentInstanceContext fragmentInstanceContext, int pipelineId) { this.fragmentInstanceContext = fragmentInstanceContext; this.driverTaskID = new DriverTaskId(fragmentInstanceContext.getId(), pipelineId); @@ -78,6 +88,14 @@ public class DriverContext { return dependencyDriverIndex; } + public void setUpstreamOperator(ExchangeOperator upstreamOperator) { + this.upstreamOperator = upstreamOperator; + } + + public ExchangeOperator getUpstreamOperator() { + return upstreamOperator; + } + public void setSink(ISink sink) { this.sink = sink; } diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/IDriver.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/IDriver.java index 88514bddcc..a06ba08c52 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/IDriver.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/IDriver.java @@ -72,5 +72,5 @@ public interface IDriver { /** @return get Sink of current IDriver */ ISink getSink(); - int getDependencyDriverIndex(); + DriverContext getDriverContext(); } diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SharedTsBlockQueue.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SharedTsBlockQueue.java index 34858dfaa4..6b1c2396a8 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SharedTsBlockQueue.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SharedTsBlockQueue.java @@ -73,6 +73,7 @@ public class SharedTsBlockQueue { private ListenableFuture<Void> blockedOnMemory; private boolean closed = false; + private boolean alreadyRegistered = false; private LocalSourceHandle sourceHandle; private LocalSinkChannel sinkChannel; @@ -80,6 +81,9 @@ public class SharedTsBlockQueue { private long maxBytesCanReserve = IoTDBDescriptor.getInstance().getConfig().getMaxBytesPerFragmentInstance(); + // When the sink channel of a pipeline driver closes, all dependency drivers can be submitted + private SettableFuture<Void> blockedDependencyDriver = null; + public SharedTsBlockQueue( TFragmentInstanceId fragmentInstanceId, String planNodeId, @@ -91,10 +95,6 @@ public class SharedTsBlockQueue { this.localPlanNodeId = Validate.notNull(planNodeId, "PlanNode ID cannot be null"); this.localMemoryManager = Validate.notNull(localMemoryManager, "local memory manager cannot be null"); - localMemoryManager - .getQueryPool() - .registerPlanNodeIdToQueryMemoryMap( - fragmentInstanceId.queryId, fullFragmentInstanceId, planNodeId); } public boolean hasNoMoreTsBlocks() { @@ -207,6 +207,12 @@ public class SharedTsBlockQueue { Validate.notNull(tsBlock, "TsBlock cannot be null"); Validate.isTrue(blockedOnMemory == null || blockedOnMemory.isDone(), "queue is full"); + if (!alreadyRegistered) { + localMemoryManager + .getQueryPool() + .registerPlanNodeIdToQueryMemoryMap( + localFragmentInstanceId.queryId, fullFragmentInstanceId, localPlanNodeId); + } Pair<ListenableFuture<Void>, Boolean> pair = localMemoryManager .getQueryPool() @@ -264,6 +270,17 @@ public class SharedTsBlockQueue { bufferRetainedSizeInBytes); bufferRetainedSizeInBytes = 0; } + // Dependency driver must be submitted before this task is cleared + if (blockedDependencyDriver != null) { + this.blockedDependencyDriver.set(null); + } + } + + public SettableFuture<Void> getBlockedDependencyDriver() { + if (blockedDependencyDriver == null) { + blockedDependencyDriver = SettableFuture.create(); + } + return blockedDependencyDriver; } /** Destroy the queue and cancel the future. Should only be called in abnormal case */ diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/ExchangeOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/ExchangeOperator.java index 27ef9a3022..de23d93fb8 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/ExchangeOperator.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/ExchangeOperator.java @@ -24,6 +24,7 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId; import org.apache.iotdb.tsfile.read.common.block.TsBlock; import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.SettableFuture; import static org.apache.iotdb.tsfile.read.common.block.TsBlockBuilderStatus.DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES; @@ -39,6 +40,8 @@ public class ExchangeOperator implements SourceOperator { private long maxReturnSize = DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES; + private SettableFuture<Void> blockedDependencyDriver = null; + public ExchangeOperator( OperatorContext operatorContext, ISourceHandle sourceHandle, PlanNodeId sourceId) { this.operatorContext = operatorContext; @@ -122,5 +125,15 @@ public class ExchangeOperator implements SourceOperator { @Override public void close() throws Exception { sourceHandle.close(); + if (blockedDependencyDriver != null) { + blockedDependencyDriver.set(null); + } + } + + public SettableFuture<Void> getBlockedDependencyDriver() { + if (blockedDependencyDriver == null) { + blockedDependencyDriver = SettableFuture.create(); + } + return blockedDependencyDriver; } } diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/DriverScheduler.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/DriverScheduler.java index 6cd893f441..dc3fa2e959 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/DriverScheduler.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/DriverScheduler.java @@ -188,9 +188,10 @@ public class DriverScheduler implements IDriverScheduler, IService { List<DriverTask> submittedTasks = new ArrayList<>(); for (DriverTask task : tasks) { IDriver driver = task.getDriver(); - if (driver.getDependencyDriverIndex() != -1) { + int dependencyDriverIndex = driver.getDriverContext().getDependencyDriverIndex(); + if (dependencyDriverIndex != -1) { SettableFuture<?> blockedDependencyFuture = - tasks.get(driver.getDependencyDriverIndex()).getBlockedDependencyDriver(); + tasks.get(dependencyDriverIndex).getBlockedDependencyDriver(); blockedDependencyFuture.addListener( () -> { // Only if query is alive, we can submit this task @@ -485,8 +486,6 @@ public class DriverScheduler implements IDriverScheduler, IService { } finally { task.unlock(); } - // Dependency driver must be submitted before this task is cleared - task.submitDependencyDriver(); task.lock(); try { clearDriverTask(task); diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/task/DriverTask.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/task/DriverTask.java index 20334d4c44..4768d222a2 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/task/DriverTask.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/task/DriverTask.java @@ -21,6 +21,7 @@ package org.apache.iotdb.db.mpp.execution.schedule.task; import org.apache.iotdb.db.mpp.common.FragmentInstanceId; import org.apache.iotdb.db.mpp.common.PlanFragmentId; import org.apache.iotdb.db.mpp.common.QueryId; +import org.apache.iotdb.db.mpp.execution.driver.DriverContext; import org.apache.iotdb.db.mpp.execution.driver.IDriver; import org.apache.iotdb.db.mpp.execution.exchange.sink.ISink; import org.apache.iotdb.db.mpp.execution.schedule.DriverTaskThread; @@ -59,8 +60,6 @@ public class DriverTask implements IDIndexedAccessible { private long lastEnterReadyQueueTime; private long lastEnterBlockQueueTime; - private SettableFuture<Void> blockedDependencyDriver = null; - /** Initialize a dummy instance for queryHolder */ public DriverTask() { this(new StubFragmentInstance(), 0L, null, null); @@ -140,17 +139,8 @@ public class DriverTask implements IDIndexedAccessible { this.abortCause = abortCause; } - public void submitDependencyDriver() { - if (blockedDependencyDriver != null) { - this.blockedDependencyDriver.set(null); - } - } - public SettableFuture<Void> getBlockedDependencyDriver() { - if (blockedDependencyDriver == null) { - blockedDependencyDriver = SettableFuture.create(); - } - return blockedDependencyDriver; + return driver.getDriverContext().getUpstreamOperator().getBlockedDependencyDriver(); } public Priority getPriority() { @@ -269,8 +259,8 @@ public class DriverTask implements IDIndexedAccessible { } @Override - public int getDependencyDriverIndex() { - return -1; + public DriverContext getDriverContext() { + return null; } } } diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanContext.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanContext.java index 3dfde552ca..01a42de6fe 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanContext.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanContext.java @@ -144,6 +144,10 @@ public class LocalExecutionPlanContext { return pipelineDriverFactories; } + public PipelineDriverFactory getCurrentPipelineDriverFactory() { + return pipelineDriverFactories.get(pipelineDriverFactories.size() - 1); + } + public int getPipelineNumber() { return pipelineDriverFactories.size(); } diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java index 1083fafcfc..a8c8e3ab84 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java @@ -2597,7 +2597,7 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP // Add dependency for all pipelines under current node if (dependencyChildNode != 0) { for (int i = originPipeNum; i < subContext.getPipelineNumber(); i++) { - context.getPipelineDriverFactories().get(i).setDependencyPipeline(dependencyPipeId); + context.getPipelineDriverFactories().get(i).setDependencyPipeIndex(dependencyPipeId); } } @@ -2614,6 +2614,7 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP context.getDriverContext()), childNode.getPlanNodeId(), childOperation.calculateMaxReturnSize()); + context.getCurrentPipelineDriverFactory().setUpstreamOperator(sourceOperator); context .getTimeSliceAllocator() .recordExecutionWeight(sourceOperator.getOperatorContext(), 1); diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/PipelineDriverFactory.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/PipelineDriverFactory.java index 8cc9ec0e51..07626548e6 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/PipelineDriverFactory.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/PipelineDriverFactory.java @@ -26,6 +26,7 @@ import org.apache.iotdb.db.mpp.execution.driver.DriverContext; import org.apache.iotdb.db.mpp.execution.driver.SchemaDriver; import org.apache.iotdb.db.mpp.execution.driver.SchemaDriverContext; import org.apache.iotdb.db.mpp.execution.operator.Operator; +import org.apache.iotdb.db.mpp.execution.operator.source.ExchangeOperator; import static java.util.Objects.requireNonNull; @@ -34,7 +35,6 @@ public class PipelineDriverFactory { private final DriverContext driverContext; // TODO Use OperatorFactory to replace operator to generate multiple drivers for on pipeline private final Operator operation; - private int dependencyPipelineIndex = -1; public PipelineDriverFactory(Operator operation, DriverContext driverContext) { this.operation = requireNonNull(operation, "rootOperator is null"); @@ -54,9 +54,6 @@ public class PipelineDriverFactory { } else { driver = new SchemaDriver(operation, (SchemaDriverContext) driverContext); } - if (dependencyPipelineIndex != -1) { - driver.getDriverContext().setDependencyDriverIndex(dependencyPipelineIndex); - } return driver; } catch (Throwable failure) { try { @@ -70,11 +67,15 @@ public class PipelineDriverFactory { } } - public void setDependencyPipeline(int dependencyPipelineIndex) { - this.dependencyPipelineIndex = dependencyPipelineIndex; + public void setDependencyPipeIndex(int dependencyDriverIndex) { + this.driverContext.setDependencyDriverIndex(dependencyDriverIndex); } public int getDependencyPipelineIndex() { - return dependencyPipelineIndex; + return this.driverContext.getDependencyDriverIndex(); + } + + public void setUpstreamOperator(ExchangeOperator exchangeOperator) { + this.driverContext.setUpstreamOperator(exchangeOperator); } } diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/schedule/DriverSchedulerTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/schedule/DriverSchedulerTest.java index ff8d0fdebf..1f26df098c 100644 --- a/server/src/test/java/org/apache/iotdb/db/mpp/execution/schedule/DriverSchedulerTest.java +++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/schedule/DriverSchedulerTest.java @@ -22,6 +22,7 @@ import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.mpp.common.FragmentInstanceId; import org.apache.iotdb.db.mpp.common.PlanFragmentId; import org.apache.iotdb.db.mpp.common.QueryId; +import org.apache.iotdb.db.mpp.execution.driver.DriverContext; import org.apache.iotdb.db.mpp.execution.driver.IDriver; import org.apache.iotdb.db.mpp.execution.exchange.IMPPDataExchangeManager; import org.apache.iotdb.db.mpp.execution.schedule.task.DriverTask; @@ -63,12 +64,12 @@ public class DriverSchedulerTest { DriverTaskId driverTaskId1 = new DriverTaskId(instanceId1, 0); IDriver mockDriver1 = Mockito.mock(IDriver.class); Mockito.when(mockDriver1.getDriverTaskId()).thenReturn(driverTaskId1); - Mockito.when(mockDriver1.getDependencyDriverIndex()).thenReturn(-1); + Mockito.when(mockDriver1.getDriverContext()).thenReturn(new DriverContext()); FragmentInstanceId instanceId2 = new FragmentInstanceId(fragmentId, "inst-1"); DriverTaskId driverTaskId2 = new DriverTaskId(instanceId2, 0); IDriver mockDriver2 = Mockito.mock(IDriver.class); Mockito.when(mockDriver2.getDriverTaskId()).thenReturn(driverTaskId2); - Mockito.when(mockDriver2.getDependencyDriverIndex()).thenReturn(-1); + Mockito.when(mockDriver2.getDriverContext()).thenReturn(new DriverContext()); List<IDriver> instances = Arrays.asList(mockDriver1, mockDriver2); manager.submitDrivers(queryId, instances, QUERY_TIMEOUT_MS); Assert.assertTrue(manager.getBlockedTasks().isEmpty()); @@ -93,7 +94,7 @@ public class DriverSchedulerTest { FragmentInstanceId instanceId3 = new FragmentInstanceId(fragmentId, "inst-2"); DriverTaskId driverTaskId3 = new DriverTaskId(instanceId3, 0); Mockito.when(mockDriver3.getDriverTaskId()).thenReturn(driverTaskId3); - Mockito.when(mockDriver3.getDependencyDriverIndex()).thenReturn(-1); + Mockito.when(mockDriver3.getDriverContext()).thenReturn(new DriverContext()); manager.submitDrivers(queryId, Collections.singletonList(mockDriver3), QUERY_TIMEOUT_MS); Assert.assertTrue(manager.getBlockedTasks().isEmpty()); Assert.assertEquals(1, manager.getQueryMap().size()); @@ -114,7 +115,7 @@ public class DriverSchedulerTest { DriverTaskId driverTaskId4 = new DriverTaskId(instanceId4, 0); IDriver mockDriver4 = Mockito.mock(IDriver.class); Mockito.when(mockDriver4.getDriverTaskId()).thenReturn(driverTaskId4); - Mockito.when(mockDriver4.getDependencyDriverIndex()).thenReturn(-1); + Mockito.when(mockDriver4.getDriverContext()).thenReturn(new DriverContext()); manager.submitDrivers(queryId2, Collections.singletonList(mockDriver4), QUERY_TIMEOUT_MS); Assert.assertTrue(manager.getBlockedTasks().isEmpty()); Assert.assertEquals(2, manager.getQueryMap().size());
