This is an automated email from the ASF dual-hosted git repository. xiangweiwei pushed a commit to branch advancePipeline in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 5f207d6febb655bafa3e0f8e0063e00d518c5285 Author: Alima777 <[email protected]> AuthorDate: Fri Feb 10 17:42:59 2023 +0800 Add consumeOneByOne unit tests --- .../db/mpp/plan/planner/PipelineDriverFactory.java | 4 + .../db/mpp/plan/plan/PipelineBuilderTest.java | 376 ++++++++++++++++++++- 2 files changed, 379 insertions(+), 1 deletion(-) diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/PipelineDriverFactory.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/PipelineDriverFactory.java index e71107b7af..8cc9ec0e51 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/PipelineDriverFactory.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/PipelineDriverFactory.java @@ -73,4 +73,8 @@ public class PipelineDriverFactory { public void setDependencyPipeline(int dependencyPipelineIndex) { this.dependencyPipelineIndex = dependencyPipelineIndex; } + + public int getDependencyPipelineIndex() { + return dependencyPipelineIndex; + } } diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/PipelineBuilderTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/PipelineBuilderTest.java index 2f487ec69d..29c84d10e0 100644 --- a/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/PipelineBuilderTest.java +++ b/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/PipelineBuilderTest.java @@ -21,6 +21,7 @@ package org.apache.iotdb.db.mpp.plan.plan; import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory; import org.apache.iotdb.commons.exception.IllegalPathException; +import org.apache.iotdb.commons.path.AlignedPath; import org.apache.iotdb.commons.path.MeasurementPath; import org.apache.iotdb.db.engine.storagegroup.DataRegion; import org.apache.iotdb.db.mpp.common.FragmentInstanceId; @@ -29,13 +30,16 @@ import org.apache.iotdb.db.mpp.common.QueryId; import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceContext; import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceStateMachine; import org.apache.iotdb.db.mpp.execution.operator.Operator; +import org.apache.iotdb.db.mpp.execution.operator.source.AlignedSeriesScanOperator; import org.apache.iotdb.db.mpp.execution.operator.source.ExchangeOperator; import org.apache.iotdb.db.mpp.execution.operator.source.SeriesScanOperator; import org.apache.iotdb.db.mpp.plan.analyze.TypeProvider; import org.apache.iotdb.db.mpp.plan.planner.LocalExecutionPlanContext; import org.apache.iotdb.db.mpp.plan.planner.OperatorTreeGenerator; import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId; +import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.DeviceViewNode; import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.TimeJoinNode; +import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.AlignedSeriesScanNode; import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.SeriesScanNode; import org.apache.iotdb.db.mpp.plan.statement.component.Ordering; import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; @@ -226,7 +230,7 @@ public class PipelineBuilderTest { } /** - * This test will test dop = 4. Expected result is five pipelines: + * This test will test dop = 5. Expected result is five pipelines: * * <p>The first is: TimeJoin1 - [ExchangeOperator, ExchangeOperator, ExchangeOperator, * ExchangeOperator]; @@ -283,6 +287,356 @@ public class PipelineBuilderTest { assertEquals("SeriesScanNode3", exchangeOperator4.getSourceId().getId()); } + /** + * This test will test dop = 6. Expected result is still five pipelines: + * + * <p>The first is: TimeJoin1 - [ExchangeOperator, ExchangeOperator, ExchangeOperator, + * ExchangeOperator]; + * + * <p>The second is: ExchangeOperator - SeriesScan0. + * + * <p>The third is: ExchangeOperator - SeriesScan1. + * + * <p>The forth is: ExchangeOperator - SeriesScan2. + * + * <p>The fifth is: ExchangeOperator - SeriesScan3. + */ + @Test + public void testConsumeAllChildrenPipelineBuilder6() throws IllegalPathException { + TypeProvider typeProvider = new TypeProvider(); + TimeJoinNode timeJoinNode = initTimeJoinNode(typeProvider, 4); + LocalExecutionPlanContext context = createLocalExecutionPlanContext(typeProvider); + context.setDegreeOfParallelism(6); + + List<Operator> childrenOperator = + operatorTreeGenerator.dealWithConsumeAllChildrenPipelineBreaker(timeJoinNode, context); + // The number of pipeline is 4, since parent pipeline hasn't joined + assertEquals(4, context.getPipelineNumber()); + + // Validate the first pipeline + assertEquals(4, childrenOperator.size()); + for (int i = 0; i < 4; i++) { + assertEquals(ExchangeOperator.class, childrenOperator.get(i).getClass()); + } + + // Validate the changes of node structure + assertEquals(4, timeJoinNode.getChildren().size()); + for (int i = 0; i < 4; i++) { + assertEquals(SeriesScanNode.class, timeJoinNode.getChildren().get(i).getClass()); + assertEquals( + String.format("root.sg.d%d.s1", i), + timeJoinNode.getChildren().get(i).getOutputColumnNames().get(0)); + } + + // Validate the second pipeline + ExchangeOperator exchangeOperator1 = (ExchangeOperator) childrenOperator.get(0); + assertEquals("SeriesScanNode0", exchangeOperator1.getSourceId().getId()); + + // Validate the third pipeline + ExchangeOperator exchangeOperator2 = (ExchangeOperator) childrenOperator.get(1); + assertEquals("SeriesScanNode1", exchangeOperator2.getSourceId().getId()); + + // Validate the forth pipeline + ExchangeOperator exchangeOperator3 = (ExchangeOperator) childrenOperator.get(2); + assertEquals("SeriesScanNode2", exchangeOperator3.getSourceId().getId()); + + // Validate the fifth pipeline + ExchangeOperator exchangeOperator4 = (ExchangeOperator) childrenOperator.get(3); + assertEquals("SeriesScanNode3", exchangeOperator4.getSourceId().getId()); + } + + /** + * The operator structure is [DeviceView - [SeriesScan0,SeriesScan1,SeriesScan2,SeriesScan3]]. + * + * <p>The next six tests, I will test this DeviceViewOperator with different dop. + * + * <p>The first test will test dop = 1. Expected result is that no child pipelines will be + * divided. + */ + @Test + public void testConsumeOneByOneChildrenPipelineBuilder1() throws IllegalPathException { + TypeProvider typeProvider = new TypeProvider(); + DeviceViewNode deviceViewNode = initDeviceViewNode(typeProvider, 4); + LocalExecutionPlanContext context = createLocalExecutionPlanContext(typeProvider); + context.setDegreeOfParallelism(1); + + List<Operator> childrenOperator = + operatorTreeGenerator.dealWithConsumeChildrenOneByOneNode(deviceViewNode, context); + assertEquals(0, context.getPipelineNumber()); + assertEquals(4, childrenOperator.size()); + for (int i = 0; i < 4; i++) { + assertEquals(AlignedSeriesScanOperator.class, childrenOperator.get(i).getClass()); + assertEquals( + String.format("root.sg.d%d.s1", i), + deviceViewNode.getChildren().get(i).getOutputColumnNames().get(0)); + } + } + + /** + * This test will test dop = 2. Expected result is five pipelines with dependency: + * + * <p>The first is: DeviceView - [ExchangeOperator, ExchangeOperator, ExchangeOperator, + * ExchangeOperator]; + * + * <p>The second is: ExchangeOperator - SeriesScan0. + * + * <p>The third is: ExchangeOperator - SeriesScan1, which has dependency second pipeline. + * + * <p>The forth is: ExchangeOperator - SeriesScan2, which has dependency third pipeline. + * + * <p>The fifth is: ExchangeOperator - SeriesScan3, which has dependency forth pipeline. + */ + @Test + public void testConsumeOneByOneChildrenPipelineBuilder2() throws IllegalPathException { + TypeProvider typeProvider = new TypeProvider(); + DeviceViewNode deviceViewNode = initDeviceViewNode(typeProvider, 4); + LocalExecutionPlanContext context = createLocalExecutionPlanContext(typeProvider); + context.setDegreeOfParallelism(2); + + List<Operator> childrenOperator = + operatorTreeGenerator.dealWithConsumeChildrenOneByOneNode(deviceViewNode, context); + // The number of pipeline is 4, since parent pipeline hasn't joined + assertEquals(4, context.getPipelineNumber()); + + // Validate the first pipeline + assertEquals(4, childrenOperator.size()); + for (int i = 0; i < 4; i++) { + assertEquals(ExchangeOperator.class, childrenOperator.get(i).getClass()); + } + + // Validate the second pipeline + ExchangeOperator exchangeOperator1 = (ExchangeOperator) childrenOperator.get(0); + assertEquals("AlignedSeriesScanNode0", exchangeOperator1.getSourceId().getId()); + assertEquals(-1, context.getPipelineDriverFactories().get(0).getDependencyPipelineIndex()); + + // Validate the third pipeline + ExchangeOperator exchangeOperator2 = (ExchangeOperator) childrenOperator.get(1); + assertEquals("AlignedSeriesScanNode1", exchangeOperator2.getSourceId().getId()); + assertEquals(0, context.getPipelineDriverFactories().get(1).getDependencyPipelineIndex()); + + // Validate the forth pipeline + ExchangeOperator exchangeOperator3 = (ExchangeOperator) childrenOperator.get(2); + assertEquals("AlignedSeriesScanNode2", exchangeOperator3.getSourceId().getId()); + assertEquals(1, context.getPipelineDriverFactories().get(2).getDependencyPipelineIndex()); + + // Validate the fifth pipeline + ExchangeOperator exchangeOperator4 = (ExchangeOperator) childrenOperator.get(3); + assertEquals("AlignedSeriesScanNode3", exchangeOperator4.getSourceId().getId()); + assertEquals(2, context.getPipelineDriverFactories().get(3).getDependencyPipelineIndex()); + } + + /** + * This test will test dop = 3. Expected result is five pipelines with dependency: + * + * <p>The first is: DeviceView - [ExchangeOperator, ExchangeOperator, ExchangeOperator, + * ExchangeOperator]; + * + * <p>The second is: ExchangeOperator - SeriesScan0. + * + * <p>The third is: ExchangeOperator - SeriesScan1. + * + * <p>The forth is: ExchangeOperator - SeriesScan2, which has dependency second pipeline. + * + * <p>The fifth is: ExchangeOperator - SeriesScan3, which has dependency third pipeline. + */ + @Test + public void testConsumeOneByOneChildrenPipelineBuilder3() throws IllegalPathException { + TypeProvider typeProvider = new TypeProvider(); + DeviceViewNode deviceViewNode = initDeviceViewNode(typeProvider, 4); + LocalExecutionPlanContext context = createLocalExecutionPlanContext(typeProvider); + context.setDegreeOfParallelism(3); + + List<Operator> childrenOperator = + operatorTreeGenerator.dealWithConsumeChildrenOneByOneNode(deviceViewNode, context); + // The number of pipeline is 4, since parent pipeline hasn't joined + assertEquals(4, context.getPipelineNumber()); + + // Validate the first pipeline + assertEquals(4, childrenOperator.size()); + for (int i = 0; i < 4; i++) { + assertEquals(ExchangeOperator.class, childrenOperator.get(i).getClass()); + } + + // Validate the second pipeline + ExchangeOperator exchangeOperator1 = (ExchangeOperator) childrenOperator.get(0); + assertEquals("AlignedSeriesScanNode0", exchangeOperator1.getSourceId().getId()); + assertEquals(-1, context.getPipelineDriverFactories().get(0).getDependencyPipelineIndex()); + + // Validate the third pipeline + ExchangeOperator exchangeOperator2 = (ExchangeOperator) childrenOperator.get(1); + assertEquals("AlignedSeriesScanNode1", exchangeOperator2.getSourceId().getId()); + assertEquals(-1, context.getPipelineDriverFactories().get(1).getDependencyPipelineIndex()); + + // Validate the forth pipeline + ExchangeOperator exchangeOperator3 = (ExchangeOperator) childrenOperator.get(2); + assertEquals("AlignedSeriesScanNode2", exchangeOperator3.getSourceId().getId()); + assertEquals(0, context.getPipelineDriverFactories().get(2).getDependencyPipelineIndex()); + + // Validate the fifth pipeline + ExchangeOperator exchangeOperator4 = (ExchangeOperator) childrenOperator.get(3); + assertEquals("AlignedSeriesScanNode3", exchangeOperator4.getSourceId().getId()); + assertEquals(1, context.getPipelineDriverFactories().get(3).getDependencyPipelineIndex()); + } + + /** + * This test will test dop = 4. Expected result is five pipelines with dependency: + * + * <p>The first is: DeviceView - [ExchangeOperator, ExchangeOperator, ExchangeOperator, + * ExchangeOperator]; + * + * <p>The second is: ExchangeOperator - SeriesScan0. + * + * <p>The third is: ExchangeOperator - SeriesScan1. + * + * <p>The forth is: ExchangeOperator - SeriesScan2. + * + * <p>The fifth is: ExchangeOperator - SeriesScan3, which has dependency second pipeline. + */ + @Test + public void testConsumeOneByOneChildrenPipelineBuilder4() throws IllegalPathException { + TypeProvider typeProvider = new TypeProvider(); + DeviceViewNode deviceViewNode = initDeviceViewNode(typeProvider, 4); + LocalExecutionPlanContext context = createLocalExecutionPlanContext(typeProvider); + context.setDegreeOfParallelism(4); + + List<Operator> childrenOperator = + operatorTreeGenerator.dealWithConsumeChildrenOneByOneNode(deviceViewNode, context); + // The number of pipeline is 4, since parent pipeline hasn't joined + assertEquals(4, context.getPipelineNumber()); + + // Validate the first pipeline + assertEquals(4, childrenOperator.size()); + for (int i = 0; i < 4; i++) { + assertEquals(ExchangeOperator.class, childrenOperator.get(i).getClass()); + } + + // Validate the second pipeline + ExchangeOperator exchangeOperator1 = (ExchangeOperator) childrenOperator.get(0); + assertEquals("AlignedSeriesScanNode0", exchangeOperator1.getSourceId().getId()); + assertEquals(-1, context.getPipelineDriverFactories().get(0).getDependencyPipelineIndex()); + + // Validate the third pipeline + ExchangeOperator exchangeOperator2 = (ExchangeOperator) childrenOperator.get(1); + assertEquals("AlignedSeriesScanNode1", exchangeOperator2.getSourceId().getId()); + assertEquals(-1, context.getPipelineDriverFactories().get(1).getDependencyPipelineIndex()); + + // Validate the forth pipeline + ExchangeOperator exchangeOperator3 = (ExchangeOperator) childrenOperator.get(2); + assertEquals("AlignedSeriesScanNode2", exchangeOperator3.getSourceId().getId()); + assertEquals(-1, context.getPipelineDriverFactories().get(2).getDependencyPipelineIndex()); + + // Validate the fifth pipeline + ExchangeOperator exchangeOperator4 = (ExchangeOperator) childrenOperator.get(3); + assertEquals("AlignedSeriesScanNode3", exchangeOperator4.getSourceId().getId()); + assertEquals(0, context.getPipelineDriverFactories().get(3).getDependencyPipelineIndex()); + } + + /** + * This test will test dop = 5. Expected result is five pipelines without dependency: + * + * <p>The first is: DeviceView - [ExchangeOperator, ExchangeOperator, ExchangeOperator, + * ExchangeOperator]; + * + * <p>The second is: ExchangeOperator - SeriesScan0. + * + * <p>The third is: ExchangeOperator - SeriesScan1. + * + * <p>The forth is: ExchangeOperator - SeriesScan2. + * + * <p>The fifth is: ExchangeOperator - SeriesScan3. + */ + @Test + public void testConsumeOneByOneChildrenPipelineBuilder5() throws IllegalPathException { + TypeProvider typeProvider = new TypeProvider(); + DeviceViewNode deviceViewNode = initDeviceViewNode(typeProvider, 4); + LocalExecutionPlanContext context = createLocalExecutionPlanContext(typeProvider); + context.setDegreeOfParallelism(5); + + List<Operator> childrenOperator = + operatorTreeGenerator.dealWithConsumeChildrenOneByOneNode(deviceViewNode, context); + // The number of pipeline is 4, since parent pipeline hasn't joined + assertEquals(4, context.getPipelineNumber()); + + // Validate the first pipeline + assertEquals(4, childrenOperator.size()); + for (int i = 0; i < 4; i++) { + assertEquals(ExchangeOperator.class, childrenOperator.get(i).getClass()); + } + + // Validate the second pipeline + ExchangeOperator exchangeOperator1 = (ExchangeOperator) childrenOperator.get(0); + assertEquals("AlignedSeriesScanNode0", exchangeOperator1.getSourceId().getId()); + assertEquals(-1, context.getPipelineDriverFactories().get(0).getDependencyPipelineIndex()); + + // Validate the third pipeline + ExchangeOperator exchangeOperator2 = (ExchangeOperator) childrenOperator.get(1); + assertEquals("AlignedSeriesScanNode1", exchangeOperator2.getSourceId().getId()); + assertEquals(-1, context.getPipelineDriverFactories().get(1).getDependencyPipelineIndex()); + + // Validate the forth pipeline + ExchangeOperator exchangeOperator3 = (ExchangeOperator) childrenOperator.get(2); + assertEquals("AlignedSeriesScanNode2", exchangeOperator3.getSourceId().getId()); + assertEquals(-1, context.getPipelineDriverFactories().get(2).getDependencyPipelineIndex()); + + // Validate the fifth pipeline + ExchangeOperator exchangeOperator4 = (ExchangeOperator) childrenOperator.get(3); + assertEquals("AlignedSeriesScanNode3", exchangeOperator4.getSourceId().getId()); + assertEquals(-1, context.getPipelineDriverFactories().get(3).getDependencyPipelineIndex()); + } + + /** + * This test will test dop = 5. Expected result is five pipelines without dependency: + * + * <p>The first is: DeviceView - [ExchangeOperator, ExchangeOperator, ExchangeOperator, + * ExchangeOperator]; + * + * <p>The second is: ExchangeOperator - SeriesScan0. + * + * <p>The third is: ExchangeOperator - SeriesScan1. + * + * <p>The forth is: ExchangeOperator - SeriesScan2. + * + * <p>The fifth is: ExchangeOperator - SeriesScan3. + */ + @Test + public void testConsumeOneByOneChildrenPipelineBuilder6() throws IllegalPathException { + TypeProvider typeProvider = new TypeProvider(); + DeviceViewNode deviceViewNode = initDeviceViewNode(typeProvider, 4); + LocalExecutionPlanContext context = createLocalExecutionPlanContext(typeProvider); + context.setDegreeOfParallelism(5); + + List<Operator> childrenOperator = + operatorTreeGenerator.dealWithConsumeChildrenOneByOneNode(deviceViewNode, context); + // The number of pipeline is 4, since parent pipeline hasn't joined + assertEquals(4, context.getPipelineNumber()); + + // Validate the first pipeline + assertEquals(4, childrenOperator.size()); + for (int i = 0; i < 4; i++) { + assertEquals(ExchangeOperator.class, childrenOperator.get(i).getClass()); + } + + // Validate the second pipeline + ExchangeOperator exchangeOperator1 = (ExchangeOperator) childrenOperator.get(0); + assertEquals("AlignedSeriesScanNode0", exchangeOperator1.getSourceId().getId()); + assertEquals(-1, context.getPipelineDriverFactories().get(0).getDependencyPipelineIndex()); + + // Validate the third pipeline + ExchangeOperator exchangeOperator2 = (ExchangeOperator) childrenOperator.get(1); + assertEquals("AlignedSeriesScanNode1", exchangeOperator2.getSourceId().getId()); + assertEquals(-1, context.getPipelineDriverFactories().get(1).getDependencyPipelineIndex()); + + // Validate the forth pipeline + ExchangeOperator exchangeOperator3 = (ExchangeOperator) childrenOperator.get(2); + assertEquals("AlignedSeriesScanNode2", exchangeOperator3.getSourceId().getId()); + assertEquals(-1, context.getPipelineDriverFactories().get(2).getDependencyPipelineIndex()); + + // Validate the fifth pipeline + ExchangeOperator exchangeOperator4 = (ExchangeOperator) childrenOperator.get(3); + assertEquals("AlignedSeriesScanNode3", exchangeOperator4.getSourceId().getId()); + assertEquals(-1, context.getPipelineDriverFactories().get(3).getDependencyPipelineIndex()); + } + private LocalExecutionPlanContext createLocalExecutionPlanContext(TypeProvider typeProvider) { ExecutorService instanceNotificationExecutor = IoTDBThreadPoolFactory.newFixedThreadPool(1, "test-instance-notification"); @@ -319,4 +673,24 @@ public class PipelineBuilderTest { } return timeJoinNode; } + + /** + * This method will init a DeviceViewNode with @childNum alignedSeriesScanNode as children. + * + * @param childNum the number of children + * @return a DeviceViewNode with @childNum alignedSeriesScanNode as children + */ + private DeviceViewNode initDeviceViewNode(TypeProvider typeProvider, int childNum) + throws IllegalPathException { + DeviceViewNode deviceViewNode = + new DeviceViewNode(new PlanNodeId("DeviceViewNode"), null, null, null); + for (int i = 0; i < childNum; i++) { + AlignedSeriesScanNode alignedSeriesScanNode = + new AlignedSeriesScanNode( + new PlanNodeId(String.format("AlignedSeriesScanNode%d", i)), + new AlignedPath(String.format("root.sg.d%d", i), "s1")); + deviceViewNode.addChild(alignedSeriesScanNode); + } + return deviceViewNode; + } }
