This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 1b9c9eff30 [IOTDB-5716] Wrong dependency when pipeline
consumeOneByOneOperator
1b9c9eff30 is described below
commit 1b9c9eff300f657e2098545685c2d73277e72f4d
Author: Xiangwei Wei <[email protected]>
AuthorDate: Thu Mar 23 10:29:16 2023 +0800
[IOTDB-5716] Wrong dependency when pipeline consumeOneByOneOperator
---
.../db/mpp/execution/schedule/DriverScheduler.java | 2 +-
.../db/mpp/plan/planner/OperatorTreeGenerator.java | 3 +-
.../db/mpp/plan/plan/PipelineBuilderTest.java | 96 +++++++++++++++++++++-
3 files changed, 98 insertions(+), 3 deletions(-)
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/DriverScheduler.java
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/DriverScheduler.java
index 5179a57b3a..6cd893f441 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/DriverScheduler.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/DriverScheduler.java
@@ -311,7 +311,6 @@ public class DriverScheduler implements IDriverScheduler,
IService {
readyQueue.decreaseReservedSize();
break;
case FINISHED:
- readyQueue.decreaseReservedSize();
break;
}
@@ -482,6 +481,7 @@ public class DriverScheduler implements IDriverScheduler,
IService {
}
task.updateSchedulePriority(context);
task.setStatus(DriverTaskStatus.FINISHED);
+ readyQueue.decreaseReservedSize();
} finally {
task.unlock();
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java
index 0e556f45bb..1083fafcfc 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java
@@ -2587,8 +2587,9 @@ public class OperatorTreeGenerator extends
PlanVisitor<Operator, LocalExecutionP
if (sumOfChildPipelines > dopForChild) {
// Update dependencyPipeId, after which finishes we can submit
curChildPipeline
while (sumOfChildPipelines > dopForChild) {
- dependencyPipeId = context.getPipelineNumber() -
sumOfChildPipelines;
sumOfChildPipelines -=
childPipelineNums.get(dependencyChildNode);
+ // The dependency pipeline must be a parent pipeline rather than
a child pipeline
+ dependencyPipeId = context.getPipelineNumber() -
sumOfChildPipelines - 1;
sumOfChildExchangeNums -=
childExchangeNums.get(dependencyChildNode);
dependencyChildNode++;
}
diff --git
a/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/PipelineBuilderTest.java
b/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/PipelineBuilderTest.java
index d3db6d7811..56263126ef 100644
---
a/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/PipelineBuilderTest.java
+++
b/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/PipelineBuilderTest.java
@@ -19,10 +19,13 @@
package org.apache.iotdb.db.mpp.plan.plan;
+import org.apache.iotdb.common.rpc.thrift.TAggregationType;
+import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
import org.apache.iotdb.commons.exception.IllegalPathException;
import org.apache.iotdb.commons.path.AlignedPath;
import org.apache.iotdb.commons.path.MeasurementPath;
+import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.db.engine.storagegroup.DataRegion;
import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
import org.apache.iotdb.db.mpp.common.PlanFragmentId;
@@ -34,15 +37,20 @@ import
org.apache.iotdb.db.mpp.execution.operator.source.AlignedSeriesScanOperat
import org.apache.iotdb.db.mpp.execution.operator.source.ExchangeOperator;
import org.apache.iotdb.db.mpp.execution.operator.source.SeriesScanOperator;
import org.apache.iotdb.db.mpp.plan.analyze.TypeProvider;
+import org.apache.iotdb.db.mpp.plan.expression.leaf.TimeSeriesOperand;
import org.apache.iotdb.db.mpp.plan.planner.LocalExecutionPlanContext;
import org.apache.iotdb.db.mpp.plan.planner.OperatorTreeGenerator;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.AggregationNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.DeviceViewNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.ExchangeNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.TimeJoinNode;
import
org.apache.iotdb.db.mpp.plan.planner.plan.node.source.AlignedSeriesScanNode;
+import
org.apache.iotdb.db.mpp.plan.planner.plan.node.source.SeriesAggregationScanNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.SeriesScanNode;
+import
org.apache.iotdb.db.mpp.plan.planner.plan.parameter.AggregationDescriptor;
+import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.AggregationStep;
import org.apache.iotdb.db.mpp.plan.statement.component.Ordering;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
@@ -50,6 +58,7 @@ import org.junit.Test;
import org.mockito.Mockito;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.List;
import java.util.concurrent.ExecutorService;
@@ -621,7 +630,7 @@ public class PipelineBuilderTest {
}
/**
- * This test will test dop = 5. Expected result is five pipelines without
dependency:
+ * This test will test dop = 6. Expected result is five pipelines without
dependency:
*
* <p>The first is: DeviceView - [ExchangeOperator, ExchangeOperator,
ExchangeOperator,
* ExchangeOperator];
@@ -676,6 +685,91 @@ public class PipelineBuilderTest {
assertEquals(4, context.getExchangeSumNum());
}
+ /**
+ * The operator structure is:
+ *
+ * <p>DeviceViewOperator - [AggregationOperator1 -
[SeriesAggregationScanOperator1,
+ * ExchangeOperator1]], [AggregationOperator2 -
[SeriesAggregationScanOperator2,
+ * ExchangeOperator2]].
+ *
+ * <p>This test will test dop = 3. Expected result is five pipelines with
dependency:
+ *
+ * <p>The pipeline0 is: ExchangeOperator - SeriesAggregationScanOperator1.
+ *
+ * <p>The pipeline1 is: ExchangeOperator - AggregationOperator1.
+ *
+ * <p>The pipeline2 is: ExchangeOperator - SeriesAggregationScanOperator2,
which has dependency 1.
+ *
+ * <p>The pipeline3 is: ExchangeOperator - AggregationOperator2, which has
dependency1.
+ *
+ * <p>The pipeline4 is: DeviceView - [ExchangeOperator, ExchangeOperator]
+ */
+ @Test
+ public void testConsumeOneByOneChildrenPipelineBuilderDependency() throws
IllegalPathException {
+ TypeProvider typeProvider = new TypeProvider();
+ typeProvider.setType("root.sg.d0.s1", TSDataType.INT64);
+ typeProvider.setType("root.sg.d1.s1", TSDataType.INT64);
+ typeProvider.setType("count(root.sg.d0.s1)", TSDataType.INT64);
+ typeProvider.setType("count(root.sg.d1.s1)", TSDataType.INT64);
+ DeviceViewNode deviceViewNode =
+ new DeviceViewNode(new PlanNodeId("DeviceViewNode"), null, null, null);
+ for (int i = 0; i < 2; i++) {
+ PartialPath path = new MeasurementPath(String.format("root.sg.d%d.s1",
i), TSDataType.INT64);
+ AggregationNode aggregationNode =
+ new AggregationNode(
+ new PlanNodeId(String.format("AggregationOperator%d", i)),
+ Collections.singletonList(
+ new AggregationDescriptor(
+ TAggregationType.COUNT.name().toLowerCase(),
+ AggregationStep.FINAL,
+ Collections.singletonList(new TimeSeriesOperand(path)))),
+ null,
+ Ordering.ASC);
+ SeriesAggregationScanNode seriesAggregationScanNode =
+ new SeriesAggregationScanNode(
+ new PlanNodeId(String.format("seriesAggregationScanNode%d", i)),
+ (MeasurementPath) path,
+ Collections.singletonList(
+ new AggregationDescriptor(
+ TAggregationType.COUNT.name().toLowerCase(),
+ AggregationStep.PARTIAL,
+ Collections.singletonList(new
TimeSeriesOperand(path)))));
+ ExchangeNode exchangeNode =
+ new ExchangeNode(new PlanNodeId(String.format("ExchangeNode%d", i)));
+ exchangeNode.setUpstream(
+ new TEndPoint("127.0.0.1", 6667),
+ new FragmentInstanceId(new PlanFragmentId("q", 1), "ds"),
+ new PlanNodeId("test"));
+ aggregationNode.addChild(seriesAggregationScanNode);
+ aggregationNode.addChild(exchangeNode);
+ deviceViewNode.addChild(aggregationNode);
+ }
+ LocalExecutionPlanContext context =
createLocalExecutionPlanContext(typeProvider);
+ context.setDegreeOfParallelism(3);
+
+ List<Operator> childrenOperator =
+
operatorTreeGenerator.dealWithConsumeChildrenOneByOneNode(deviceViewNode,
context);
+ // The number of pipeline is 4, since parent pipeline hasn't joined
+ assertEquals(4, context.getPipelineNumber());
+
+ assertEquals(2, childrenOperator.size());
+ for (int i = 0; i < 2; i++) {
+ assertEquals(ExchangeOperator.class, childrenOperator.get(i).getClass());
+ }
+
+ // Validate the first pipeline
+ assertEquals(-1,
context.getPipelineDriverFactories().get(0).getDependencyPipelineIndex());
+
+ // Validate the second pipeline
+ assertEquals(-1,
context.getPipelineDriverFactories().get(1).getDependencyPipelineIndex());
+
+ // Validate the third pipeline
+ assertEquals(1,
context.getPipelineDriverFactories().get(2).getDependencyPipelineIndex());
+
+ // Validate the forth pipeline
+ assertEquals(1,
context.getPipelineDriverFactories().get(2).getDependencyPipelineIndex());
+ }
+
@Test
public void testGetChildNumInEachPipeline() {
List<PlanNode> allChildren = new ArrayList<>();