This is an automated email from the ASF dual-hosted git repository. xiangweiwei pushed a commit to branch advancePipeline in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 082fd7d0b7d49e87fc5cc75a76f5695efa817a05 Author: Alima777 <[email protected]> AuthorDate: Fri Feb 10 17:00:13 2023 +0800 Add consumeAll unit tests --- .../process/join/RowBasedTimeJoinOperator.java | 6 + .../db/mpp/plan/planner/OperatorTreeGenerator.java | 4 +- .../db/mpp/plan/plan/PipelineBuilderTest.java | 322 +++++++++++++++++++++ 3 files changed, 330 insertions(+), 2 deletions(-) diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/join/RowBasedTimeJoinOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/join/RowBasedTimeJoinOperator.java index 50cccb9573..79f9905ba8 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/join/RowBasedTimeJoinOperator.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/join/RowBasedTimeJoinOperator.java @@ -18,6 +18,7 @@ */ package org.apache.iotdb.db.mpp.execution.operator.process.join; +import org.apache.iotdb.commons.utils.TestOnly; import org.apache.iotdb.db.mpp.execution.operator.Operator; import org.apache.iotdb.db.mpp.execution.operator.OperatorContext; import org.apache.iotdb.db.mpp.execution.operator.process.AbstractProcessOperator; @@ -290,6 +291,11 @@ public class RowBasedTimeJoinOperator extends AbstractProcessOperator { timeSelector.add(inputTsBlocks[index].getTimeByIndex(inputIndex[index])); } + @TestOnly + public List<Operator> getChildren() { + return children; + } + /** * If the tsBlock of columnIndex is null or has no more data in the tsBlock, return true; else * return false; diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java index 21156e3f82..dff2e57965 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java @@ -2211,7 +2211,7 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP node.getPathPatternList(), node.getTemplateId())); } - private List<Operator> dealWithConsumeAllChildrenPipelineBreaker( + public List<Operator> dealWithConsumeAllChildrenPipelineBreaker( PlanNode node, LocalExecutionPlanContext context) { // children after pipelining LinkedList<Operator> parentPipelineChildren = new LinkedList<>(); @@ -2325,7 +2325,7 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP return childNumInEachPipeline; } - private List<Operator> dealWithConsumeChildrenOneByOneNode( + public List<Operator> dealWithConsumeChildrenOneByOneNode( PlanNode node, LocalExecutionPlanContext context) { List<Operator> parentPipelineChildren = new ArrayList<>(); int originExchangeNum = context.getExchangeSumNum(); diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/PipelineBuilderTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/PipelineBuilderTest.java new file mode 100644 index 0000000000..2f487ec69d --- /dev/null +++ b/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/PipelineBuilderTest.java @@ -0,0 +1,322 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.mpp.plan.plan; + +import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory; +import org.apache.iotdb.commons.exception.IllegalPathException; +import org.apache.iotdb.commons.path.MeasurementPath; +import org.apache.iotdb.db.engine.storagegroup.DataRegion; +import org.apache.iotdb.db.mpp.common.FragmentInstanceId; +import org.apache.iotdb.db.mpp.common.PlanFragmentId; +import org.apache.iotdb.db.mpp.common.QueryId; +import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceContext; +import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceStateMachine; +import org.apache.iotdb.db.mpp.execution.operator.Operator; +import org.apache.iotdb.db.mpp.execution.operator.source.ExchangeOperator; +import org.apache.iotdb.db.mpp.execution.operator.source.SeriesScanOperator; +import org.apache.iotdb.db.mpp.plan.analyze.TypeProvider; +import org.apache.iotdb.db.mpp.plan.planner.LocalExecutionPlanContext; +import org.apache.iotdb.db.mpp.plan.planner.OperatorTreeGenerator; +import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId; +import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.TimeJoinNode; +import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.SeriesScanNode; +import org.apache.iotdb.db.mpp.plan.statement.component.Ordering; +import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; + +import org.junit.Test; +import org.mockito.Mockito; + +import java.util.List; +import java.util.concurrent.ExecutorService; + +import static org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceContext.createFragmentInstanceContext; +import static org.junit.Assert.assertEquals; + +public class PipelineBuilderTest { + + OperatorTreeGenerator operatorTreeGenerator = new OperatorTreeGenerator(); + + /** + * The operator structure is [TimeJoin1 - [SeriesScan0,SeriesScan1,SeriesScan2,SeriesScan3]]. + * + * <p>The next six tests, I will test this TimeJoinOperator with different dop. + * + * <p>The first test will test dop = 1. Expected result is that no child pipelines will be + * divided. + */ + @Test + public void testConsumeAllChildrenPipelineBuilder1() throws IllegalPathException { + TypeProvider typeProvider = new TypeProvider(); + TimeJoinNode timeJoinNode = initTimeJoinNode(typeProvider, 4); + LocalExecutionPlanContext context = createLocalExecutionPlanContext(typeProvider); + context.setDegreeOfParallelism(1); + + List<Operator> childrenOperator = + operatorTreeGenerator.dealWithConsumeAllChildrenPipelineBreaker(timeJoinNode, context); + assertEquals(0, context.getPipelineNumber()); + assertEquals(4, childrenOperator.size()); + assertEquals(4, timeJoinNode.getChildren().size()); + for (int i = 0; i < 4; i++) { + assertEquals(SeriesScanOperator.class, childrenOperator.get(i).getClass()); + assertEquals(SeriesScanNode.class, timeJoinNode.getChildren().get(i).getClass()); + assertEquals( + String.format("root.sg.d%d.s1", i), + timeJoinNode.getChildren().get(i).getOutputColumnNames().get(0)); + } + } + + /** + * This test will test dop = 2. Expected result is two pipelines: + * + * <p>The first is: TimeJoin1 - [SeriesScan1, SeriesScan0, ExchangeOperator]; + * + * <p>The second is: ExchangeOperator - TimeJoin1-1[SeriesScan2, SeriesScan3]. + */ + @Test + public void testConsumeAllChildrenPipelineBuilder2() throws IllegalPathException { + TypeProvider typeProvider = new TypeProvider(); + TimeJoinNode timeJoinNode = initTimeJoinNode(typeProvider, 4); + LocalExecutionPlanContext context = createLocalExecutionPlanContext(typeProvider); + context.setDegreeOfParallelism(2); + + List<Operator> childrenOperator = + operatorTreeGenerator.dealWithConsumeAllChildrenPipelineBreaker(timeJoinNode, context); + // The number of pipeline is 1, since parent pipeline hasn't joined + assertEquals(1, context.getPipelineNumber()); + + // Validate the first pipeline + assertEquals(3, childrenOperator.size()); + assertEquals(3, timeJoinNode.getChildren().size()); + for (int i = 0; i < 2; i++) { + assertEquals(SeriesScanOperator.class, childrenOperator.get(i).getClass()); + assertEquals(SeriesScanNode.class, timeJoinNode.getChildren().get(i).getClass()); + } + assertEquals(ExchangeOperator.class, childrenOperator.get(2).getClass()); + + // Validate the changes of node structure + assertEquals("root.sg.d1.s1", timeJoinNode.getChildren().get(0).getOutputColumnNames().get(0)); + assertEquals("root.sg.d0.s1", timeJoinNode.getChildren().get(1).getOutputColumnNames().get(0)); + assertEquals(TimeJoinNode.class, timeJoinNode.getChildren().get(2).getClass()); + + // Validate the second pipeline + TimeJoinNode subTimeJoinNode = (TimeJoinNode) timeJoinNode.getChildren().get(2); + assertEquals(2, subTimeJoinNode.getChildren().size()); + assertEquals( + "root.sg.d2.s1", subTimeJoinNode.getChildren().get(0).getOutputColumnNames().get(0)); + assertEquals( + "root.sg.d3.s1", subTimeJoinNode.getChildren().get(1).getOutputColumnNames().get(0)); + } + + /** + * This test will test dop = 3. Expected result is three pipelines: + * + * <p>The first is: TimeJoin1 - [SeriesScan0, ExchangeOperator, ExchangeOperator]; + * + * <p>The second is: ExchangeOperator - SeriesScan1. + * + * <p>The third is: ExchangeOperator - TimeJoin1-1[SeriesScan2, SeriesScan3]. + */ + @Test + public void testConsumeAllChildrenPipelineBuilder3() throws IllegalPathException { + TypeProvider typeProvider = new TypeProvider(); + TimeJoinNode timeJoinNode = initTimeJoinNode(typeProvider, 4); + LocalExecutionPlanContext context = createLocalExecutionPlanContext(typeProvider); + context.setDegreeOfParallelism(3); + + List<Operator> childrenOperator = + operatorTreeGenerator.dealWithConsumeAllChildrenPipelineBreaker(timeJoinNode, context); + // The number of pipeline is 2, since parent pipeline hasn't joined + assertEquals(2, context.getPipelineNumber()); + + // Validate the first pipeline + assertEquals(3, childrenOperator.size()); + assertEquals(SeriesScanOperator.class, childrenOperator.get(0).getClass()); + assertEquals(ExchangeOperator.class, childrenOperator.get(1).getClass()); + assertEquals(ExchangeOperator.class, childrenOperator.get(2).getClass()); + + // Validate the changes of node structure + assertEquals(3, timeJoinNode.getChildren().size()); + assertEquals("root.sg.d0.s1", timeJoinNode.getChildren().get(0).getOutputColumnNames().get(0)); + assertEquals("root.sg.d1.s1", timeJoinNode.getChildren().get(1).getOutputColumnNames().get(0)); + assertEquals(TimeJoinNode.class, timeJoinNode.getChildren().get(2).getClass()); + + // Validate the second pipeline + ExchangeOperator exchangeOperator1 = (ExchangeOperator) childrenOperator.get(1); + assertEquals("SeriesScanNode1", exchangeOperator1.getSourceId().getId()); + + // Validate the third pipeline + TimeJoinNode subTimeJoinNode = (TimeJoinNode) timeJoinNode.getChildren().get(2); + assertEquals(2, subTimeJoinNode.getChildren().size()); + assertEquals( + "root.sg.d2.s1", subTimeJoinNode.getChildren().get(0).getOutputColumnNames().get(0)); + assertEquals( + "root.sg.d3.s1", subTimeJoinNode.getChildren().get(1).getOutputColumnNames().get(0)); + ExchangeOperator exchangeOperator2 = (ExchangeOperator) childrenOperator.get(2); + assertEquals(exchangeOperator2.getSourceId(), subTimeJoinNode.getPlanNodeId()); + } + + /** + * This test will test dop = 4. Expected result is four pipelines: + * + * <p>The first is: TimeJoin1 - [SeriesScan0, ExchangeOperator, ExchangeOperator, + * ExchangeOperator]; + * + * <p>The second is: ExchangeOperator - SeriesScan1. + * + * <p>The third is: ExchangeOperator - SeriesScan2. + * + * <p>The forth is: ExchangeOperator - SeriesScan3. + */ + @Test + public void testConsumeAllChildrenPipelineBuilder4() throws IllegalPathException { + TypeProvider typeProvider = new TypeProvider(); + TimeJoinNode timeJoinNode = initTimeJoinNode(typeProvider, 4); + LocalExecutionPlanContext context = createLocalExecutionPlanContext(typeProvider); + context.setDegreeOfParallelism(4); + + List<Operator> childrenOperator = + operatorTreeGenerator.dealWithConsumeAllChildrenPipelineBreaker(timeJoinNode, context); + // The number of pipeline is 3, since parent pipeline hasn't joined + assertEquals(3, context.getPipelineNumber()); + + // Validate the first pipeline + assertEquals(4, childrenOperator.size()); + assertEquals(SeriesScanOperator.class, childrenOperator.get(0).getClass()); + assertEquals(ExchangeOperator.class, childrenOperator.get(1).getClass()); + assertEquals(ExchangeOperator.class, childrenOperator.get(2).getClass()); + assertEquals(ExchangeOperator.class, childrenOperator.get(3).getClass()); + + // Validate the changes of node structure + assertEquals(4, timeJoinNode.getChildren().size()); + for (int i = 0; i < 4; i++) { + assertEquals(SeriesScanNode.class, timeJoinNode.getChildren().get(i).getClass()); + assertEquals( + String.format("root.sg.d%d.s1", i), + timeJoinNode.getChildren().get(i).getOutputColumnNames().get(0)); + } + + // Validate the second pipeline + ExchangeOperator exchangeOperator1 = (ExchangeOperator) childrenOperator.get(1); + assertEquals("SeriesScanNode1", exchangeOperator1.getSourceId().getId()); + + // Validate the third pipeline + ExchangeOperator exchangeOperator2 = (ExchangeOperator) childrenOperator.get(2); + assertEquals("SeriesScanNode2", exchangeOperator2.getSourceId().getId()); + + // Validate the forth pipeline + ExchangeOperator exchangeOperator3 = (ExchangeOperator) childrenOperator.get(3); + assertEquals("SeriesScanNode3", exchangeOperator3.getSourceId().getId()); + } + + /** + * This test will test dop = 4. Expected result is five pipelines: + * + * <p>The first is: TimeJoin1 - [ExchangeOperator, ExchangeOperator, ExchangeOperator, + * ExchangeOperator]; + * + * <p>The second is: ExchangeOperator - SeriesScan0. + * + * <p>The third is: ExchangeOperator - SeriesScan1. + * + * <p>The forth is: ExchangeOperator - SeriesScan2. + * + * <p>The fifth is: ExchangeOperator - SeriesScan3. + */ + @Test + public void testConsumeAllChildrenPipelineBuilder5() throws IllegalPathException { + TypeProvider typeProvider = new TypeProvider(); + TimeJoinNode timeJoinNode = initTimeJoinNode(typeProvider, 4); + LocalExecutionPlanContext context = createLocalExecutionPlanContext(typeProvider); + context.setDegreeOfParallelism(5); + + List<Operator> childrenOperator = + operatorTreeGenerator.dealWithConsumeAllChildrenPipelineBreaker(timeJoinNode, context); + // The number of pipeline is 4, since parent pipeline hasn't joined + assertEquals(4, context.getPipelineNumber()); + + // Validate the first pipeline + assertEquals(4, childrenOperator.size()); + for (int i = 0; i < 4; i++) { + assertEquals(ExchangeOperator.class, childrenOperator.get(i).getClass()); + } + + // Validate the changes of node structure + assertEquals(4, timeJoinNode.getChildren().size()); + for (int i = 0; i < 4; i++) { + assertEquals(SeriesScanNode.class, timeJoinNode.getChildren().get(i).getClass()); + assertEquals( + String.format("root.sg.d%d.s1", i), + timeJoinNode.getChildren().get(i).getOutputColumnNames().get(0)); + } + + // Validate the second pipeline + ExchangeOperator exchangeOperator1 = (ExchangeOperator) childrenOperator.get(0); + assertEquals("SeriesScanNode0", exchangeOperator1.getSourceId().getId()); + + // Validate the third pipeline + ExchangeOperator exchangeOperator2 = (ExchangeOperator) childrenOperator.get(1); + assertEquals("SeriesScanNode1", exchangeOperator2.getSourceId().getId()); + + // Validate the forth pipeline + ExchangeOperator exchangeOperator3 = (ExchangeOperator) childrenOperator.get(2); + assertEquals("SeriesScanNode2", exchangeOperator3.getSourceId().getId()); + + // Validate the fifth pipeline + ExchangeOperator exchangeOperator4 = (ExchangeOperator) childrenOperator.get(3); + assertEquals("SeriesScanNode3", exchangeOperator4.getSourceId().getId()); + } + + private LocalExecutionPlanContext createLocalExecutionPlanContext(TypeProvider typeProvider) { + ExecutorService instanceNotificationExecutor = + IoTDBThreadPoolFactory.newFixedThreadPool(1, "test-instance-notification"); + + QueryId queryId = new QueryId("stub_query"); + FragmentInstanceId instanceId = + new FragmentInstanceId(new PlanFragmentId(queryId, 0), "stub-instance"); + FragmentInstanceStateMachine stateMachine = + new FragmentInstanceStateMachine(instanceId, instanceNotificationExecutor); + DataRegion dataRegion = Mockito.mock(DataRegion.class); + FragmentInstanceContext fragmentInstanceContext = + createFragmentInstanceContext(instanceId, stateMachine); + fragmentInstanceContext.setDataRegion(dataRegion); + + return new LocalExecutionPlanContext(typeProvider, fragmentInstanceContext); + } + + /** + * This method will init a timeJoinNode with @childNum seriesScanNode as children. + * + * @param childNum the number of children + * @return a timeJoinNode with @childNum seriesScanNode as children + */ + private TimeJoinNode initTimeJoinNode(TypeProvider typeProvider, int childNum) + throws IllegalPathException { + TimeJoinNode timeJoinNode = new TimeJoinNode(new PlanNodeId("TimeJoinNode"), Ordering.ASC); + for (int i = 0; i < childNum; i++) { + SeriesScanNode seriesScanNode = + new SeriesScanNode( + new PlanNodeId(String.format("SeriesScanNode%d", i)), + new MeasurementPath(String.format("root.sg.d%d.s1", i), TSDataType.INT32)); + typeProvider.setType(seriesScanNode.getSeriesPath().toString(), TSDataType.INT32); + timeJoinNode.addChild(seriesScanNode); + } + return timeJoinNode; + } +}
