This is an automated email from the ASF dual-hosted git repository. lancelly pushed a commit to branch pipelinehierarchyTree in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 7992acd1b49da8db36aaf9b79003e19b2ae74b3a Author: lancelly <[email protected]> AuthorDate: Tue Apr 23 15:12:49 2024 +0800 finish --- .../plan/planner/LocalExecutionPlanContext.java | 50 ++ .../plan/planner/LocalExecutionPlanner.java | 16 +- .../plan/planner/OperatorTreeGenerator.java | 21 +- .../ConsumeAllChildrenPipelineMemoryEstimator.java | 38 ++ ...umeChildrenOneByOnePipelineMemoryEstimator.java | 74 +++ .../planner/memory/PipelineMemoryEstimator.java | 68 +++ .../memory/PipelineMemoryEstimatorFactory.java | 51 ++ .../plan/planner/plan/node/PlanNodeType.java | 2 + .../node/process/join/FullOuterTimeJoinNode.java | 5 + .../plan/planner/PipelineBuilderTest.java | 578 +++++++++++++++++++++ 10 files changed, 896 insertions(+), 7 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LocalExecutionPlanContext.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LocalExecutionPlanContext.java index 5d9a2f8f46d..9a55d298a0e 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LocalExecutionPlanContext.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LocalExecutionPlanContext.java @@ -18,6 +18,7 @@ */ package org.apache.iotdb.db.queryengine.plan.planner; +import org.apache.iotdb.commons.utils.TestOnly; import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.queryengine.common.FragmentInstanceId; import org.apache.iotdb.db.queryengine.execution.driver.DataDriverContext; @@ -30,6 +31,10 @@ import org.apache.iotdb.db.queryengine.execution.operator.Operator; import org.apache.iotdb.db.queryengine.execution.operator.source.ExchangeOperator; import org.apache.iotdb.db.queryengine.plan.analyze.TemplatedInfo; import org.apache.iotdb.db.queryengine.plan.analyze.TypeProvider; +import org.apache.iotdb.db.queryengine.plan.planner.memory.PipelineMemoryEstimator; +import org.apache.iotdb.db.queryengine.plan.planner.memory.PipelineMemoryEstimatorFactory; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId; import org.apache.iotdb.db.schemaengine.schemaregion.ISchemaRegion; import org.apache.iotdb.tsfile.common.conf.TSFileConfig; import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; @@ -41,9 +46,12 @@ import org.apache.iotdb.tsfile.utils.Pair; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import javax.annotation.Nullable; + import java.time.ZoneId; import java.util.ArrayList; import java.util.HashSet; +import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Optional; @@ -91,6 +99,10 @@ public class LocalExecutionPlanContext { // use AtomicReference not for thread-safe, just for updating same field in different pipeline private AtomicReference<List<Long>> timePartitions = new AtomicReference<>(); + /** Records the parent of each pipeline. The order of each list does not matter for now. */ + private Map<PlanNodeId, List<PipelineMemoryEstimator>> parentPlanNodeIdToMemoryEstimator = + new ConcurrentHashMap<>(); + // for data region public LocalExecutionPlanContext( TypeProvider typeProvider, @@ -122,6 +134,7 @@ public class LocalExecutionPlanContext { parentContext.getDriverContext().createSubDriverContext(getNextPipelineId()); this.dataNodeQueryContext = parentContext.dataNodeQueryContext; this.timePartitions = parentContext.timePartitions; + this.parentPlanNodeIdToMemoryEstimator = parentContext.parentPlanNodeIdToMemoryEstimator; } // for schema region @@ -146,6 +159,38 @@ public class LocalExecutionPlanContext { new PipelineDriverFactory(operation, driverContext, estimatedMemorySize)); } + /** + * Each time we construct a pipeline, we should also construct the memory estimator for the + * pipeline. + * + * @param operation the root operator of the pipeline + * @param parentPlanNodeId the parent plan node id of the root of the pipeline + * @param root the root node of the pipeline + * @param dependencyPipelineIndex the index of the dependency pipeline, -1 if no dependency + */ + public PipelineMemoryEstimator constructPipelineMemoryEstimator( + final Operator operation, + @Nullable final PlanNodeId parentPlanNodeId, + final PlanNode root, + final int dependencyPipelineIndex) { + PipelineMemoryEstimator currentPipelineMemoryEstimator = + PipelineMemoryEstimatorFactory.createPipelineMemoryEstimator( + operation, root, dependencyPipelineIndex); + // As OperatorTreeGenerator traverse the tree in a post-order way, all the children of current + // pipeline have been recorded in the map. + List<PipelineMemoryEstimator> childrenMemoryEstimators = + parentPlanNodeIdToMemoryEstimator.get(root.getPlanNodeId()); + if (childrenMemoryEstimators != null) { + currentPipelineMemoryEstimator.addChildren(childrenMemoryEstimators); + } + if (parentPlanNodeId != null) { + parentPlanNodeIdToMemoryEstimator + .computeIfAbsent(parentPlanNodeId, k -> new LinkedList<>()) + .add(currentPipelineMemoryEstimator); + } + return currentPipelineMemoryEstimator; + } + public LocalExecutionPlanContext createSubContext() { return new LocalExecutionPlanContext(this); } @@ -305,4 +350,9 @@ public class LocalExecutionPlanContext { public TemplatedInfo getTemplatedInfo() { return typeProvider.getTemplatedInfo(); } + + @TestOnly + public Map<PlanNodeId, List<PipelineMemoryEstimator>> getParentPlanNodeIdToMemoryEstimator() { + return parentPlanNodeIdToMemoryEstimator; + } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LocalExecutionPlanner.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LocalExecutionPlanner.java index 6acd4e1b716..2b6c4e173fb 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LocalExecutionPlanner.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LocalExecutionPlanner.java @@ -29,6 +29,7 @@ import org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceStateM import org.apache.iotdb.db.queryengine.execution.operator.Operator; import org.apache.iotdb.db.queryengine.metric.QueryRelatedResourceMetricSet; import org.apache.iotdb.db.queryengine.plan.analyze.TypeProvider; +import org.apache.iotdb.db.queryengine.plan.planner.memory.PipelineMemoryEstimator; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode; import org.apache.iotdb.db.schemaengine.schemaregion.ISchemaRegion; import org.apache.iotdb.db.utils.SetThreadName; @@ -82,8 +83,11 @@ public class LocalExecutionPlanner { // TODO Replace operator with operatorFactory to build multiple driver for one pipeline Operator root = plan.accept(new OperatorTreeGenerator(), context); + PipelineMemoryEstimator memoryEstimator = + context.constructPipelineMemoryEstimator(root, null, plan, -1); + // check whether current free memory is enough to execute current query - long estimatedMemorySize = checkMemory(root, instanceContext.getStateMachine()); + long estimatedMemorySize = checkMemory(memoryEstimator, instanceContext.getStateMachine()); context.addPipelineDriverFactory(root, context.getDriverContext(), estimatedMemorySize); @@ -105,8 +109,11 @@ public class LocalExecutionPlanner { Operator root = plan.accept(new OperatorTreeGenerator(), context); + PipelineMemoryEstimator memoryEstimator = + context.constructPipelineMemoryEstimator(root, null, plan, -1); + // check whether current free memory is enough to execute current query - checkMemory(root, instanceContext.getStateMachine()); + checkMemory(memoryEstimator, instanceContext.getStateMachine()); context.addPipelineDriverFactory(root, context.getDriverContext(), 0); @@ -116,7 +123,8 @@ public class LocalExecutionPlanner { return context.getPipelineDriverFactories(); } - private long checkMemory(Operator root, FragmentInstanceStateMachine stateMachine) + private long checkMemory( + final PipelineMemoryEstimator memoryEstimator, FragmentInstanceStateMachine stateMachine) throws MemoryNotEnoughException { // if it is disabled, just return @@ -125,7 +133,7 @@ public class LocalExecutionPlanner { return 0; } - long estimatedMemorySize = root.calculateMaxPeekMemoryWithCounter(); + long estimatedMemorySize = memoryEstimator.calculateEstimatedMemorySize(); QueryRelatedResourceMetricSet.getInstance().updateEstimatedMemory(estimatedMemorySize); synchronized (this) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/OperatorTreeGenerator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/OperatorTreeGenerator.java index a6d043f5ad2..8e46e4ec6ed 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/OperatorTreeGenerator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/OperatorTreeGenerator.java @@ -154,6 +154,7 @@ import org.apache.iotdb.db.queryengine.plan.expression.leaf.TimeSeriesOperand; import org.apache.iotdb.db.queryengine.plan.expression.leaf.TimestampOperand; import org.apache.iotdb.db.queryengine.plan.expression.multi.FunctionExpression; import org.apache.iotdb.db.queryengine.plan.expression.visitor.ColumnTransformerVisitor; +import org.apache.iotdb.db.queryengine.plan.planner.memory.PipelineMemoryEstimatorFactory; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.ExplainAnalyzeNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId; @@ -2987,7 +2988,8 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP subContext.setDegreeOfParallelism(dopForChild); int originPipeNum = context.getPipelineNumber(); - Operator sourceOperator = createNewPipelineForChildNode(context, subContext, childNode); + Operator sourceOperator = + createNewPipelineForChildNode(context, subContext, childNode, node.getPlanNodeId()); parentPipelineChildren.add(sourceOperator); dopForChild = Math.max(1, dopForChild - (subContext.getPipelineNumber() - 1 - originPipeNum)); @@ -3029,7 +3031,8 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP } Operator sourceOperator = - createNewPipelineForChildNode(context, subContext, partialParentNode); + createNewPipelineForChildNode( + context, subContext, partialParentNode, node.getPlanNodeId()); parentPipelineChildren.add(sourceOperator); afterwardsNodes.add(partialParentNode); finalExchangeNum += subContext.getExchangeSumNum() - context.getExchangeSumNum() + 1; @@ -3091,7 +3094,10 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP } private Operator createNewPipelineForChildNode( - LocalExecutionPlanContext context, LocalExecutionPlanContext subContext, PlanNode childNode) { + LocalExecutionPlanContext context, + LocalExecutionPlanContext subContext, + PlanNode childNode, + PlanNodeId parentNodeId) { Operator childOperation = childNode.accept(this, subContext); ISinkChannel localSinkChannel = MPP_DATA_EXCHANGE_MANAGER.createLocalSinkChannelForPipeline( @@ -3099,6 +3105,7 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP subContext.getDriverContext(), childNode.getPlanNodeId().getId()); subContext.setISink(localSinkChannel); subContext.addPipelineDriverFactory(childOperation, subContext.getDriverContext(), 0); + subContext.constructPipelineMemoryEstimator(childOperation, parentNodeId, childNode, -1); ExchangeOperator sourceOperator = new ExchangeOperator( @@ -3118,6 +3125,7 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP public List<Operator> dealWithConsumeChildrenOneByOneNode( PlanNode node, LocalExecutionPlanContext context) { + checkArgument(PipelineMemoryEstimatorFactory.isConsumeChildrenOneByOneNode(node)); List<Operator> parentPipelineChildren = new ArrayList<>(); int originExchangeNum = context.getExchangeSumNum(); int finalExchangeNum = context.getExchangeSumNum(); @@ -3162,6 +3170,7 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP // actually is dop int curChildPipelineNum = Math.min(dopForChild, subContext.getPipelineNumber() - originPipeNum); + childPipelineNums.add(curChildPipelineNum); sumOfChildPipelines += curChildPipelineNum; // If sumOfChildPipelines > dopForChild, we have to wait until some pipelines finish @@ -3182,6 +3191,12 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP } } + subContext.constructPipelineMemoryEstimator( + childOperation, + node.getPlanNodeId(), + childNode, + dependencyChildNode == 0 ? -1 : dependencyPipeId); + ExchangeOperator sourceOperator = new ExchangeOperator( context diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/memory/ConsumeAllChildrenPipelineMemoryEstimator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/memory/ConsumeAllChildrenPipelineMemoryEstimator.java new file mode 100644 index 00000000000..d31c5873d60 --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/memory/ConsumeAllChildrenPipelineMemoryEstimator.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.queryengine.plan.planner.memory; + +import org.apache.iotdb.db.queryengine.execution.operator.Operator; + +public class ConsumeAllChildrenPipelineMemoryEstimator extends PipelineMemoryEstimator { + + public ConsumeAllChildrenPipelineMemoryEstimator( + final Operator root, final int dependencyPipelineIndex) { + super(root, dependencyPipelineIndex); + } + + @Override + public long calculateEstimatedMemorySize() { + return root.calculateMaxPeekMemoryWithCounter() + + children.stream() + .map(PipelineMemoryEstimator::calculateEstimatedMemorySize) + .reduce(0L, Long::sum); + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/memory/ConsumeChildrenOneByOnePipelineMemoryEstimator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/memory/ConsumeChildrenOneByOnePipelineMemoryEstimator.java new file mode 100644 index 00000000000..71c431fea77 --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/memory/ConsumeChildrenOneByOnePipelineMemoryEstimator.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.queryengine.plan.planner.memory; + +import org.apache.iotdb.commons.utils.TestOnly; +import org.apache.iotdb.db.queryengine.execution.operator.Operator; + +public class ConsumeChildrenOneByOnePipelineMemoryEstimator extends PipelineMemoryEstimator { + + private long concurrentRunningChildrenNum = -1; + + private boolean concurrentRunningChildrenNumInitialized = false; + + public ConsumeChildrenOneByOnePipelineMemoryEstimator( + final Operator root, final int dependencyPipelineIndex) { + super(root, dependencyPipelineIndex); + } + + @Override + public long calculateEstimatedMemorySize() { + // EstimatedSize = root.calculateMaxPeekMemoryWithCounter() + sum(children's estimated size) * + // runningChildrenNum / children.size() + retainedSize of not running children + return children.isEmpty() + ? root.calculateMaxPeekMemoryWithCounter() + : (long) + (root.calculateMaxPeekMemoryWithCounter() + + children.stream() + .map(PipelineMemoryEstimator::calculateEstimatedMemorySize) + .reduce(0L, Long::sum) + / (double) (children.size()) + * getConcurrentRunningChildrenNum() + + children.stream() + .map(PipelineMemoryEstimator::calculateRetainedMemorySize) + .reduce(0L, Long::sum) + / (double) (children.size()) + * (children.size() - getConcurrentRunningChildrenNum())); + } + + private long getConcurrentRunningChildrenNum() { + if (concurrentRunningChildrenNumInitialized) { + return concurrentRunningChildrenNum; + } + concurrentRunningChildrenNum = + children.stream() + .filter(memoryEstimator -> memoryEstimator.getDependencyPipelineIndex() == -1) + .count(); + concurrentRunningChildrenNumInitialized = true; + return concurrentRunningChildrenNum; + } + + @TestOnly + public long getConcurrentRunningChildrenNumForTest() { + return children.stream() + .filter(memoryEstimator -> memoryEstimator.getDependencyPipelineIndex() == -1) + .count(); + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/memory/PipelineMemoryEstimator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/memory/PipelineMemoryEstimator.java new file mode 100644 index 00000000000..cdf16571ca3 --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/memory/PipelineMemoryEstimator.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.queryengine.plan.planner.memory; + +import org.apache.iotdb.commons.utils.TestOnly; +import org.apache.iotdb.db.queryengine.execution.operator.Operator; + +import java.util.LinkedList; +import java.util.List; + +public abstract class PipelineMemoryEstimator { + + protected final List<PipelineMemoryEstimator> children; + + protected final Operator root; + + protected final int dependencyPipelineIndex; + + protected PipelineMemoryEstimator(final Operator root, final int dependencyPipelineIndex) { + this.root = root; + this.dependencyPipelineIndex = dependencyPipelineIndex; + this.children = new LinkedList<>(); + } + /** Calculate the estimated memory size of the pipeline. */ + public abstract long calculateEstimatedMemorySize(); + + protected long calculateRetainedMemorySize() { + return root.calculateRetainedSizeAfterCallingNext() + + children.stream() + .map(PipelineMemoryEstimator::calculateRetainedMemorySize) + .reduce(0L, Long::sum); + } + + public void addChildren(final List<PipelineMemoryEstimator> child) { + children.addAll(child); + } + + protected int getDependencyPipelineIndex() { + return dependencyPipelineIndex; + } + + @TestOnly + public List<PipelineMemoryEstimator> getChildren() { + return children; + } + + @TestOnly + public Operator getRoot() { + return root; + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/memory/PipelineMemoryEstimatorFactory.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/memory/PipelineMemoryEstimatorFactory.java new file mode 100644 index 00000000000..ea637b586e4 --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/memory/PipelineMemoryEstimatorFactory.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.queryengine.plan.planner.memory; + +import org.apache.iotdb.db.queryengine.execution.operator.Operator; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode; + +public class PipelineMemoryEstimatorFactory { + + private PipelineMemoryEstimatorFactory() { + // hidden constructor + } + + public static PipelineMemoryEstimator createPipelineMemoryEstimator( + final Operator rootOperator, final PlanNode root, final int dependencyPipelineIndex) { + return isConsumeChildrenOneByOneNode(root) + ? new ConsumeChildrenOneByOnePipelineMemoryEstimator(rootOperator, dependencyPipelineIndex) + : new ConsumeAllChildrenPipelineMemoryEstimator(rootOperator, dependencyPipelineIndex); + } + + public static boolean isConsumeChildrenOneByOneNode(final PlanNode node) { + switch (node.getType()) { + case SCHEMA_QUERY_MERGE: + case COUNT_MERGE: + case DEVICE_VIEW: + case IDENTITY_SINK: + case SCHEMA_FETCH_MERGE: + case LAST_QUERY_COLLECT: + return true; + default: + return false; + } + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanNodeType.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanNodeType.java index bda66843a97..6830a369a6d 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanNodeType.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanNodeType.java @@ -209,6 +209,8 @@ public enum PlanNodeType { EXPLAIN_ANALYZE((short) 90), PIPE_OPERATE_SCHEMA_QUEUE_REFERENCE((short) 91), + + FULL_OUTER_TIME_JOIN_REFERENCE((short) 92), ; public static final int BYTES = Short.BYTES; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/process/join/FullOuterTimeJoinNode.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/process/join/FullOuterTimeJoinNode.java index 750bc2bfc00..5ca6c352da1 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/process/join/FullOuterTimeJoinNode.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/process/join/FullOuterTimeJoinNode.java @@ -140,4 +140,9 @@ public class FullOuterTimeJoinNode extends MultiChildProcessNode { public int hashCode() { return Objects.hash(super.hashCode(), mergeOrder); } + + @Override + public PlanNodeType getType() { + return PlanNodeType.FULL_OUTER_TIME_JOIN; + } } diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/PipelineBuilderTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/PipelineBuilderTest.java index 2ec572ede00..c7005a696a3 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/PipelineBuilderTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/PipelineBuilderTest.java @@ -34,11 +34,15 @@ import org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceContex import org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceStateMachine; import org.apache.iotdb.db.queryengine.execution.operator.Operator; import org.apache.iotdb.db.queryengine.execution.operator.process.SingleDeviceViewOperator; +import org.apache.iotdb.db.queryengine.execution.operator.process.join.FullOuterTimeJoinOperator; import org.apache.iotdb.db.queryengine.execution.operator.source.AlignedSeriesScanOperator; import org.apache.iotdb.db.queryengine.execution.operator.source.ExchangeOperator; import org.apache.iotdb.db.queryengine.execution.operator.source.SeriesScanOperator; import org.apache.iotdb.db.queryengine.plan.analyze.TypeProvider; import org.apache.iotdb.db.queryengine.plan.expression.leaf.TimeSeriesOperand; +import org.apache.iotdb.db.queryengine.plan.planner.memory.ConsumeAllChildrenPipelineMemoryEstimator; +import org.apache.iotdb.db.queryengine.plan.planner.memory.ConsumeChildrenOneByOnePipelineMemoryEstimator; +import org.apache.iotdb.db.queryengine.plan.planner.memory.PipelineMemoryEstimator; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.AggregationNode; @@ -48,6 +52,7 @@ import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.SingleDevi import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.TopKNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.join.FullOuterTimeJoinNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.join.LeftOuterTimeJoinNode; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.sink.IdentitySinkNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.AlignedSeriesScanNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.SeriesAggregationScanNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.SeriesScanNode; @@ -72,6 +77,7 @@ import java.util.concurrent.ExecutorService; import static org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceContext.createFragmentInstanceContext; import static org.apache.iotdb.db.queryengine.plan.statement.component.OrderByKey.DEVICE; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; public class PipelineBuilderTest { @@ -108,6 +114,18 @@ public class PipelineBuilderTest { // Validate the number exchange operator assertEquals(0, context.getExchangeSumNum()); + + // Validate PipelineMemoryEstimator + try (Operator root = fullOuterTimeJoinNode.accept(operatorTreeGenerator, context)) { + PipelineMemoryEstimator pipelineMemoryEstimator = + context.constructPipelineMemoryEstimator(root, null, fullOuterTimeJoinNode, -1); + assertEquals( + root.calculateMaxPeekMemoryWithCounter(), + pipelineMemoryEstimator.calculateEstimatedMemorySize()); + } catch (Exception e) { + e.printStackTrace(); + fail(); + } } /** @@ -160,6 +178,38 @@ public class PipelineBuilderTest { // Validate the number exchange operator assertEquals(1, context.getExchangeSumNum()); + + // Validate PipelineMemoryEstimator + try (Operator root = + fullOuterTimeJoinNode.accept( + operatorTreeGenerator, createLocalExecutionPlanContext(typeProvider))) { + PipelineMemoryEstimator rootPipelineMemoryEstimator = + context.constructPipelineMemoryEstimator(root, null, fullOuterTimeJoinNode, -1); + assertEquals( + ConsumeAllChildrenPipelineMemoryEstimator.class, rootPipelineMemoryEstimator.getClass()); + // test calculateEstimatedMemory + assertEquals( + root.calculateMaxPeekMemoryWithCounter() + + rootPipelineMemoryEstimator.getChildren().stream() + .map(PipelineMemoryEstimator::calculateEstimatedMemorySize) + .reduce(0L, Long::sum), + rootPipelineMemoryEstimator.calculateEstimatedMemorySize()); + + List<PipelineMemoryEstimator> childrenPipelineMemoryEstimators = + rootPipelineMemoryEstimator.getChildren(); + assertEquals(1, childrenPipelineMemoryEstimators.size()); + for (int i = 0; i < 1; i++) { + assertEquals( + ConsumeAllChildrenPipelineMemoryEstimator.class, + childrenPipelineMemoryEstimators.get(i).getClass()); + assertEquals( + FullOuterTimeJoinOperator.class, + childrenPipelineMemoryEstimators.get(i).getRoot().getClass()); + } + } catch (Exception e) { + e.printStackTrace(); + fail(); + } } /** @@ -218,6 +268,36 @@ public class PipelineBuilderTest { // Validate the number exchange operator assertEquals(2, context.getExchangeSumNum()); + + // Validate PipelineMemoryEstimator + try (Operator root = + fullOuterTimeJoinNode.accept( + operatorTreeGenerator, createLocalExecutionPlanContext(typeProvider))) { + PipelineMemoryEstimator rootPipelineMemoryEstimator = + context.constructPipelineMemoryEstimator(root, null, fullOuterTimeJoinNode, -1); + assertEquals( + ConsumeAllChildrenPipelineMemoryEstimator.class, rootPipelineMemoryEstimator.getClass()); + List<PipelineMemoryEstimator> childrenPipelineMemoryEstimators = + rootPipelineMemoryEstimator.getChildren(); + assertEquals(2, childrenPipelineMemoryEstimators.size()); + for (int i = 0; i < 2; i++) { + assertEquals( + ConsumeAllChildrenPipelineMemoryEstimator.class, + childrenPipelineMemoryEstimators.get(i).getClass()); + if (i == 0) { + assertEquals( + SeriesScanOperator.class, + childrenPipelineMemoryEstimators.get(i).getRoot().getClass()); + } else { + assertEquals( + FullOuterTimeJoinOperator.class, + childrenPipelineMemoryEstimators.get(i).getRoot().getClass()); + } + } + } catch (Exception e) { + e.printStackTrace(); + fail(); + } } /** @@ -275,6 +355,29 @@ public class PipelineBuilderTest { // Validate the number exchange operator assertEquals(3, context.getExchangeSumNum()); + + // Validate PipelineMemoryEstimator + try (Operator root = + fullOuterTimeJoinNode.accept( + operatorTreeGenerator, createLocalExecutionPlanContext(typeProvider))) { + PipelineMemoryEstimator rootPipelineMemoryEstimator = + context.constructPipelineMemoryEstimator(root, null, fullOuterTimeJoinNode, -1); + assertEquals( + ConsumeAllChildrenPipelineMemoryEstimator.class, rootPipelineMemoryEstimator.getClass()); + List<PipelineMemoryEstimator> childrenPipelineMemoryEstimators = + rootPipelineMemoryEstimator.getChildren(); + assertEquals(3, childrenPipelineMemoryEstimators.size()); + for (int i = 0; i < 3; i++) { + assertEquals( + ConsumeAllChildrenPipelineMemoryEstimator.class, + childrenPipelineMemoryEstimators.get(i).getClass()); + assertEquals( + SeriesScanOperator.class, childrenPipelineMemoryEstimators.get(i).getRoot().getClass()); + } + } catch (Exception e) { + e.printStackTrace(); + fail(); + } } /** @@ -337,6 +440,29 @@ public class PipelineBuilderTest { // Validate the number exchange operator assertEquals(4, context.getExchangeSumNum()); + + // Validate PipelineMemoryEstimator + try (Operator root = + fullOuterTimeJoinNode.accept( + operatorTreeGenerator, createLocalExecutionPlanContext(typeProvider))) { + PipelineMemoryEstimator rootPipelineMemoryEstimator = + context.constructPipelineMemoryEstimator(root, null, fullOuterTimeJoinNode, -1); + assertEquals( + ConsumeAllChildrenPipelineMemoryEstimator.class, rootPipelineMemoryEstimator.getClass()); + List<PipelineMemoryEstimator> childrenPipelineMemoryEstimators = + rootPipelineMemoryEstimator.getChildren(); + assertEquals(4, childrenPipelineMemoryEstimators.size()); + for (int i = 0; i < 4; i++) { + assertEquals( + ConsumeAllChildrenPipelineMemoryEstimator.class, + childrenPipelineMemoryEstimators.get(i).getClass()); + assertEquals( + SeriesScanOperator.class, childrenPipelineMemoryEstimators.get(i).getRoot().getClass()); + } + } catch (Exception e) { + e.printStackTrace(); + fail(); + } } /** @@ -399,6 +525,29 @@ public class PipelineBuilderTest { // Validate the number exchange operator assertEquals(4, context.getExchangeSumNum()); + + // Validate PipelineMemoryEstimator + try (Operator root = + fullOuterTimeJoinNode.accept( + operatorTreeGenerator, createLocalExecutionPlanContext(typeProvider))) { + PipelineMemoryEstimator rootPipelineMemoryEstimator = + context.constructPipelineMemoryEstimator(root, null, fullOuterTimeJoinNode, -1); + assertEquals( + ConsumeAllChildrenPipelineMemoryEstimator.class, rootPipelineMemoryEstimator.getClass()); + List<PipelineMemoryEstimator> childrenPipelineMemoryEstimators = + rootPipelineMemoryEstimator.getChildren(); + assertEquals(4, childrenPipelineMemoryEstimators.size()); + for (int i = 0; i < 4; i++) { + assertEquals( + ConsumeAllChildrenPipelineMemoryEstimator.class, + childrenPipelineMemoryEstimators.get(i).getClass()); + assertEquals( + SeriesScanOperator.class, childrenPipelineMemoryEstimators.get(i).getRoot().getClass()); + } + } catch (Exception e) { + e.printStackTrace(); + fail(); + } } /** @@ -479,6 +628,21 @@ public class PipelineBuilderTest { // Validate the number exchange operator assertEquals(0, context.getExchangeSumNum()); + + // Validate PipelineMemoryEstimator + try { + PipelineMemoryEstimator rootPipelineMemoryEstimator = + context.constructPipelineMemoryEstimator(null, null, deviceViewNode, -1); + assertEquals( + ConsumeChildrenOneByOnePipelineMemoryEstimator.class, + rootPipelineMemoryEstimator.getClass()); + List<PipelineMemoryEstimator> childrenPipelineMemoryEstimators = + rootPipelineMemoryEstimator.getChildren(); + assertEquals(0, childrenPipelineMemoryEstimators.size()); + } catch (Exception e) { + e.printStackTrace(); + fail(); + } } /** @@ -535,6 +699,29 @@ public class PipelineBuilderTest { // Validate the number exchange operator assertEquals(1, context.getExchangeSumNum()); + + // Validate PipelineMemoryEstimator + try { + PipelineMemoryEstimator rootPipelineMemoryEstimator = + context.constructPipelineMemoryEstimator(null, null, deviceViewNode, -1); + assertEquals( + ConsumeChildrenOneByOnePipelineMemoryEstimator.class, + rootPipelineMemoryEstimator.getClass()); + List<PipelineMemoryEstimator> childrenPipelineMemoryEstimators = + rootPipelineMemoryEstimator.getChildren(); + assertEquals(4, childrenPipelineMemoryEstimators.size()); + for (int i = 0; i < 4; i++) { + assertEquals( + ConsumeAllChildrenPipelineMemoryEstimator.class, + childrenPipelineMemoryEstimators.get(i).getClass()); + assertEquals( + AlignedSeriesScanOperator.class, + childrenPipelineMemoryEstimators.get(i).getRoot().getClass()); + } + } catch (Exception e) { + e.printStackTrace(); + fail(); + } } /** @@ -591,6 +778,29 @@ public class PipelineBuilderTest { // Validate the number exchange operator assertEquals(2, context.getExchangeSumNum()); + + // Validate PipelineMemoryEstimator + try { + PipelineMemoryEstimator rootPipelineMemoryEstimator = + context.constructPipelineMemoryEstimator(null, null, deviceViewNode, -1); + assertEquals( + ConsumeChildrenOneByOnePipelineMemoryEstimator.class, + rootPipelineMemoryEstimator.getClass()); + List<PipelineMemoryEstimator> childrenPipelineMemoryEstimators = + rootPipelineMemoryEstimator.getChildren(); + assertEquals(4, childrenPipelineMemoryEstimators.size()); + for (int i = 0; i < 4; i++) { + assertEquals( + ConsumeAllChildrenPipelineMemoryEstimator.class, + childrenPipelineMemoryEstimators.get(i).getClass()); + assertEquals( + AlignedSeriesScanOperator.class, + childrenPipelineMemoryEstimators.get(i).getRoot().getClass()); + } + } catch (Exception e) { + e.printStackTrace(); + fail(); + } } /** @@ -647,6 +857,30 @@ public class PipelineBuilderTest { // Validate the number exchange operator assertEquals(3, context.getExchangeSumNum()); + + // Validate PipelineMemoryEstimator + try { + PipelineMemoryEstimator rootPipelineMemoryEstimator = + context.constructPipelineMemoryEstimator(null, null, deviceViewNode, -1); + assertEquals( + ConsumeChildrenOneByOnePipelineMemoryEstimator.class, + rootPipelineMemoryEstimator.getClass()); + + List<PipelineMemoryEstimator> childrenPipelineMemoryEstimators = + rootPipelineMemoryEstimator.getChildren(); + assertEquals(4, childrenPipelineMemoryEstimators.size()); + for (int i = 0; i < 4; i++) { + assertEquals( + ConsumeAllChildrenPipelineMemoryEstimator.class, + childrenPipelineMemoryEstimators.get(i).getClass()); + assertEquals( + AlignedSeriesScanOperator.class, + childrenPipelineMemoryEstimators.get(i).getRoot().getClass()); + } + } catch (Exception e) { + e.printStackTrace(); + fail(); + } } /** @@ -703,6 +937,30 @@ public class PipelineBuilderTest { // Validate the number exchange operator assertEquals(4, context.getExchangeSumNum()); + + // Validate PipelineMemoryEstimator + try { + PipelineMemoryEstimator rootPipelineMemoryEstimator = + context.constructPipelineMemoryEstimator(null, null, deviceViewNode, -1); + assertEquals( + ConsumeChildrenOneByOnePipelineMemoryEstimator.class, + rootPipelineMemoryEstimator.getClass()); + + List<PipelineMemoryEstimator> childrenPipelineMemoryEstimators = + rootPipelineMemoryEstimator.getChildren(); + assertEquals(4, childrenPipelineMemoryEstimators.size()); + for (int i = 0; i < 4; i++) { + assertEquals( + ConsumeAllChildrenPipelineMemoryEstimator.class, + childrenPipelineMemoryEstimators.get(i).getClass()); + assertEquals( + AlignedSeriesScanOperator.class, + childrenPipelineMemoryEstimators.get(i).getRoot().getClass()); + } + } catch (Exception e) { + e.printStackTrace(); + fail(); + } } /** @@ -759,6 +1017,30 @@ public class PipelineBuilderTest { // Validate the number exchange operator assertEquals(4, context.getExchangeSumNum()); + + // Validate PipelineMemoryEstimator + try { + PipelineMemoryEstimator rootPipelineMemoryEstimator = + context.constructPipelineMemoryEstimator(null, null, deviceViewNode, -1); + assertEquals( + ConsumeChildrenOneByOnePipelineMemoryEstimator.class, + rootPipelineMemoryEstimator.getClass()); + + List<PipelineMemoryEstimator> childrenPipelineMemoryEstimators = + rootPipelineMemoryEstimator.getChildren(); + assertEquals(4, childrenPipelineMemoryEstimators.size()); + for (int i = 0; i < 4; i++) { + assertEquals( + ConsumeAllChildrenPipelineMemoryEstimator.class, + childrenPipelineMemoryEstimators.get(i).getClass()); + assertEquals( + AlignedSeriesScanOperator.class, + childrenPipelineMemoryEstimators.get(i).getRoot().getClass()); + } + } catch (Exception e) { + e.printStackTrace(); + fail(); + } } /** @@ -1038,6 +1320,268 @@ public class PipelineBuilderTest { assertEquals(3, context.getExchangeSumNum()); } + @Test + public void testIdentitySinkNodeMemoryEstimatorWithDop1() throws IllegalPathException { + TypeProvider typeProvider = new TypeProvider(); + IdentitySinkNode identitySinkNode = initIdentitySinkNode(typeProvider); + LocalExecutionPlanContext context = createLocalExecutionPlanContext(typeProvider); + context.setDegreeOfParallelism(1); + + // Validate PipelineMemoryEstimator + try (Operator root = identitySinkNode.accept(operatorTreeGenerator, context)) { + PipelineMemoryEstimator rootPipelineMemoryEstimator = + context.constructPipelineMemoryEstimator(root, null, identitySinkNode, -1); + assertEquals( + ConsumeChildrenOneByOnePipelineMemoryEstimator.class, + rootPipelineMemoryEstimator.getClass()); + assertEquals( + root.calculateMaxPeekMemoryWithCounter(), + rootPipelineMemoryEstimator.calculateEstimatedMemorySize()); + + List<PipelineMemoryEstimator> childrenPipelineMemoryEstimators = + rootPipelineMemoryEstimator.getChildren(); + assertEquals(0, childrenPipelineMemoryEstimators.size()); + } catch (Exception e) { + e.printStackTrace(); + fail(); + } + } + + /** + * The operator structure is: + * + * <p>IdentitySinkOperator - [ExchangeOperator1, ExchangeOperator2, ExchangeOperator3] + * + * <p>This test will test dop = 2. Expected result is four pipelines with dependency: + * + * <p>The pipeline0 is: ExchangeOperator1 - FullOuterTimeJoinOperator1. + * + * <p>The pipeline1 is: ExchangeOperator2 - FullOuterTimeJoinOperator2, which has dependency 0. + * + * <p>The pipeline2 is: ExchangeOperator2 - FullOuterTimeJoinOperator2, which has dependency 1. + * + * <p>The pipeline3 is: IdentitySinkOperator - [ExchangeOperator1, ExchangeOperator2, + * ExchangeOperator3] + */ + @Test + public void testIdentitySinkNodeMemoryEstimatorWithDop2() throws IllegalPathException { + TypeProvider typeProvider = new TypeProvider(); + IdentitySinkNode identitySinkNode = initIdentitySinkNode(typeProvider); + LocalExecutionPlanContext context = + createLocalExecutionPlanContextWithQueryId(typeProvider, new QueryId("test_dop2")); + context.setDegreeOfParallelism(2); + + // Validate PipelineMemoryEstimator + try (Operator root = identitySinkNode.accept(operatorTreeGenerator, context)) { + PipelineMemoryEstimator rootPipelineMemoryEstimator = + context.constructPipelineMemoryEstimator(root, null, identitySinkNode, -1); + assertEquals( + ConsumeChildrenOneByOnePipelineMemoryEstimator.class, + rootPipelineMemoryEstimator.getClass()); + assertEquals( + root.calculateMaxPeekMemoryWithCounter() + + rootPipelineMemoryEstimator.getChildren().stream() + .map(PipelineMemoryEstimator::calculateEstimatedMemorySize) + .reduce(0L, Long::sum) + / 3 + + rootPipelineMemoryEstimator + .getChildren() + .get(1) + .getRoot() + .calculateRetainedSizeAfterCallingNext() + * 2, + rootPipelineMemoryEstimator.calculateEstimatedMemorySize()); + + List<PipelineMemoryEstimator> childrenPipelineMemoryEstimators = + rootPipelineMemoryEstimator.getChildren(); + assertEquals(3, childrenPipelineMemoryEstimators.size()); + for (int i = 0; i < 3; i++) { + assertEquals( + ConsumeAllChildrenPipelineMemoryEstimator.class, + childrenPipelineMemoryEstimators.get(i).getClass()); + assertEquals( + FullOuterTimeJoinOperator.class, + childrenPipelineMemoryEstimators.get(i).getRoot().getClass()); + assertEquals(0, childrenPipelineMemoryEstimators.get(i).getChildren().size()); + } + } catch (Exception e) { + e.printStackTrace(); + fail(); + } + } + + /** + * The operator structure is: + * + * <p>IdentitySinkOperator - [ExchangeOperator - FullOuterTimeJoinOperator1 - + * [SeriesScanOperator1, ExchangeOperator, ExchangeOperator], ExchangeOperator - + * FullOuterTimeJoinOperator2 - [SeriesScanOperator4, ExchangeOperator, ExchangeOperator], + * ExchangeOperator - FullOuterTimeJoinOperator3 - [SeriesScanOperator7, ExchangeOperator, + * ExchangeOperator]] + * + * <p>This test will test dop = 4. Expected result is ten pipelines with dependency: + * + * <p>The pipeline0 is: ExchangeOperator - SeriesOperator2. + * + * <p>The pipeline1 is: ExchangeOperator - SeriesOperator3. + * + * <p>The pipeline2 is: ExchangeOperator - FullOuterTimeJoinOperator1 - [SeriesScanOperator1, + * ExchangeOperator, ExchangeOperator] + * + * <p>The pipeline3 is: ExchangeOperator - SeriesOperator5, which has dependency 2. + * + * <p>The pipeline4 is: ExchangeOperator - SeriesOperator6, which has dependency 2. + * + * <p>The pipeline5 is: ExchangeOperator - FullOuterTimeJoinOperator2 - [SeriesScanOperator4, + * ExchangeOperator, ExchangeOperator], which has dependency 2. + * + * <p>The pipeline7 is: ExchangeOperator - SeriesOperator5, which has dependency 5. + * + * <p>The pipeline8 is: ExchangeOperator - SeriesOperator6, which has dependency 5. + * + * <p>The pipeline9 is: ExchangeOperator - FullOuterTimeJoinOperator2 - [SeriesScanOperator4, + * ExchangeOperator, ExchangeOperator], which has dependency 5. + * + * <p>The pipeline10 is: IdentitySinkOperator - [ExchangeOperator1, ExchangeOperator2, + * ExchangeOperator3] + */ + @Test + public void testIdentitySinkNodeMemoryEstimatorWithDop4() throws IllegalPathException { + TypeProvider typeProvider = new TypeProvider(); + IdentitySinkNode identitySinkNode = initIdentitySinkNode(typeProvider); + LocalExecutionPlanContext context = + createLocalExecutionPlanContextWithQueryId(typeProvider, new QueryId("test_dop4")); + context.setDegreeOfParallelism(4); + + // Validate PipelineMemoryEstimator + try (Operator root = identitySinkNode.accept(operatorTreeGenerator, context)) { + PipelineMemoryEstimator rootPipelineMemoryEstimator = + context.constructPipelineMemoryEstimator(root, null, identitySinkNode, -1); + assertEquals( + ConsumeChildrenOneByOnePipelineMemoryEstimator.class, + rootPipelineMemoryEstimator.getClass()); + assertEquals( + 1, + ((ConsumeChildrenOneByOnePipelineMemoryEstimator) rootPipelineMemoryEstimator) + .getConcurrentRunningChildrenNumForTest()); + + List<PipelineMemoryEstimator> childrenPipelineMemoryEstimators = + rootPipelineMemoryEstimator.getChildren(); + assertEquals(3, childrenPipelineMemoryEstimators.size()); + for (int i = 0; i < 3; i++) { + assertEquals( + ConsumeAllChildrenPipelineMemoryEstimator.class, + childrenPipelineMemoryEstimators.get(i).getClass()); + assertEquals( + FullOuterTimeJoinOperator.class, + childrenPipelineMemoryEstimators.get(i).getRoot().getClass()); + assertEquals(2, childrenPipelineMemoryEstimators.get(i).getChildren().size()); + for (int j = 0; j < 2; j++) { + assertEquals( + ConsumeAllChildrenPipelineMemoryEstimator.class, + childrenPipelineMemoryEstimators.get(i).getChildren().get(j).getClass()); + assertEquals( + SeriesScanOperator.class, + childrenPipelineMemoryEstimators.get(i).getChildren().get(j).getRoot().getClass()); + } + } + } catch (Exception e) { + e.printStackTrace(); + fail(); + } + } + + @Test + public void testIdentitySinkNodeMemoryEstimatorWithDop8() throws IllegalPathException { + TypeProvider typeProvider = new TypeProvider(); + IdentitySinkNode identitySinkNode = initIdentitySinkNode(typeProvider); + LocalExecutionPlanContext context = + createLocalExecutionPlanContextWithQueryId(typeProvider, new QueryId("test_dop16")); + context.setDegreeOfParallelism(8); + + // Validate PipelineMemoryEstimator + try (Operator root = identitySinkNode.accept(operatorTreeGenerator, context)) { + PipelineMemoryEstimator rootPipelineMemoryEstimator = + context.constructPipelineMemoryEstimator(root, null, identitySinkNode, -1); + assertEquals( + ConsumeChildrenOneByOnePipelineMemoryEstimator.class, + rootPipelineMemoryEstimator.getClass()); + assertEquals( + 1, + ((ConsumeChildrenOneByOnePipelineMemoryEstimator) rootPipelineMemoryEstimator) + .getConcurrentRunningChildrenNumForTest()); + + List<PipelineMemoryEstimator> childrenPipelineMemoryEstimators = + rootPipelineMemoryEstimator.getChildren(); + assertEquals(3, childrenPipelineMemoryEstimators.size()); + for (int i = 0; i < 3; i++) { + assertEquals( + ConsumeAllChildrenPipelineMemoryEstimator.class, + childrenPipelineMemoryEstimators.get(i).getClass()); + assertEquals( + FullOuterTimeJoinOperator.class, + childrenPipelineMemoryEstimators.get(i).getRoot().getClass()); + for (int j = 0; j < 3; j++) { + assertEquals( + ConsumeAllChildrenPipelineMemoryEstimator.class, + childrenPipelineMemoryEstimators.get(i).getChildren().get(j).getClass()); + assertEquals( + SeriesScanOperator.class, + childrenPipelineMemoryEstimators.get(i).getChildren().get(j).getRoot().getClass()); + } + } + } catch (Exception e) { + e.printStackTrace(); + fail(); + } + } + + @Test + public void testIdentitySinkNodeMemoryEstimatorWithDop16() throws IllegalPathException { + TypeProvider typeProvider = new TypeProvider(); + IdentitySinkNode identitySinkNode = initIdentitySinkNode(typeProvider); + LocalExecutionPlanContext context = + createLocalExecutionPlanContextWithQueryId(typeProvider, new QueryId("test_dop8")); + context.setDegreeOfParallelism(16); + + // Validate PipelineMemoryEstimator + try (Operator root = identitySinkNode.accept(operatorTreeGenerator, context)) { + PipelineMemoryEstimator rootPipelineMemoryEstimator = + context.constructPipelineMemoryEstimator(root, null, identitySinkNode, -1); + assertEquals( + ConsumeChildrenOneByOnePipelineMemoryEstimator.class, + rootPipelineMemoryEstimator.getClass()); + // all the pipeline under this node will be executed concurrently + assertEquals( + 3, + ((ConsumeChildrenOneByOnePipelineMemoryEstimator) rootPipelineMemoryEstimator) + .getConcurrentRunningChildrenNumForTest()); + + List<PipelineMemoryEstimator> childrenPipelineMemoryEstimators = + rootPipelineMemoryEstimator.getChildren(); + assertEquals(3, childrenPipelineMemoryEstimators.size()); + for (int i = 0; i < 3; i++) { + assertEquals( + ConsumeAllChildrenPipelineMemoryEstimator.class, + childrenPipelineMemoryEstimators.get(i).getClass()); + assertEquals( + FullOuterTimeJoinOperator.class, + childrenPipelineMemoryEstimators.get(i).getRoot().getClass()); + for (int j = 0; j < 3; j++) { + assertEquals( + ConsumeAllChildrenPipelineMemoryEstimator.class, + childrenPipelineMemoryEstimators.get(i).getChildren().get(j).getClass()); + assertEquals( + SeriesScanOperator.class, + childrenPipelineMemoryEstimators.get(i).getChildren().get(j).getRoot().getClass()); + } + } + } catch (Exception e) { + e.printStackTrace(); + fail(); + } + } + private LocalExecutionPlanContext createLocalExecutionPlanContext(TypeProvider typeProvider) { ExecutorService instanceNotificationExecutor = IoTDBThreadPoolFactory.newFixedThreadPool(1, "test-instance-notification"); @@ -1056,6 +1600,24 @@ public class PipelineBuilderTest { typeProvider, fragmentInstanceContext, new DataNodeQueryContext(1)); } + private LocalExecutionPlanContext createLocalExecutionPlanContextWithQueryId( + TypeProvider typeProvider, QueryId queryId) { + ExecutorService instanceNotificationExecutor = + IoTDBThreadPoolFactory.newFixedThreadPool(1, "test-instance-notification"); + + FragmentInstanceId instanceId = + new FragmentInstanceId(new PlanFragmentId(queryId, 0), "stub-instance"); + FragmentInstanceStateMachine stateMachine = + new FragmentInstanceStateMachine(instanceId, instanceNotificationExecutor); + DataRegion dataRegion = Mockito.mock(DataRegion.class); + FragmentInstanceContext fragmentInstanceContext = + createFragmentInstanceContext(instanceId, stateMachine); + fragmentInstanceContext.setDataRegion(dataRegion); + + return new LocalExecutionPlanContext( + typeProvider, fragmentInstanceContext, new DataNodeQueryContext(1)); + } + /** * This method will init a timeJoinNode with @childNum seriesScanNode as children. * @@ -1168,4 +1730,20 @@ public class PipelineBuilderTest { } return topKNode; } + + private IdentitySinkNode initIdentitySinkNode(TypeProvider typeProvider) + throws IllegalPathException { + FullOuterTimeJoinNode fullOuterTimeJoinNode1 = initFullOuterTimeJoinNode(typeProvider, 3); + FullOuterTimeJoinNode fullOuterTimeJoinNode2 = initFullOuterTimeJoinNode(typeProvider, 3); + FullOuterTimeJoinNode fullOuterTimeJoinNode3 = initFullOuterTimeJoinNode(typeProvider, 3); + fullOuterTimeJoinNode1.setPlanNodeId(new PlanNodeId("FullOuterTimeJoinNode1")); + fullOuterTimeJoinNode2.setPlanNodeId(new PlanNodeId("FullOuterTimeJoinNode2")); + fullOuterTimeJoinNode3.setPlanNodeId(new PlanNodeId("FullOuterTimeJoinNode3")); + + IdentitySinkNode identitySinkNode = new IdentitySinkNode(new PlanNodeId("IdentitySinkNode")); + identitySinkNode.addChild(fullOuterTimeJoinNode1); + identitySinkNode.addChild(fullOuterTimeJoinNode2); + identitySinkNode.addChild(fullOuterTimeJoinNode3); + return identitySinkNode; + } }
