This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new d2a71be993f Fix wrong calculation of ExchangeNum for
consumeAllPipelineBreaker when dop = 1
d2a71be993f is described below
commit d2a71be993f68d8208b2c370d26b02ed711595a3
Author: Liao Lanyu <[email protected]>
AuthorDate: Tue Jan 16 20:17:05 2024 +0800
Fix wrong calculation of ExchangeNum for consumeAllPipelineBreaker when dop
= 1
---
.../plan/planner/OperatorTreeGenerator.java | 4 +-
.../plan/planner/PipelineBuilderTest.java | 55 ++++++++++++++++++++++
2 files changed, 57 insertions(+), 2 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/OperatorTreeGenerator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/OperatorTreeGenerator.java
index 094a543a916..cbf8db730de 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/OperatorTreeGenerator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/OperatorTreeGenerator.java
@@ -2844,7 +2844,6 @@ public class OperatorTreeGenerator extends
PlanVisitor<Operator, LocalExecutionP
PlanNode node, LocalExecutionPlanContext context) {
// children after pipelining
List<Operator> parentPipelineChildren = new ArrayList<>();
- int finalExchangeNum = context.getExchangeSumNum();
if (context.getDegreeOfParallelism() == 1 || node.getChildren().size() ==
1) {
// If dop = 1, we don't create extra pipeline
for (PlanNode localChild : node.getChildren()) {
@@ -2852,6 +2851,7 @@ public class OperatorTreeGenerator extends
PlanVisitor<Operator, LocalExecutionP
parentPipelineChildren.add(childOperation);
}
} else {
+ int finalExchangeNum = context.getExchangeSumNum();
// Keep it since we may change the structure of origin children nodes
List<PlanNode> afterwardsNodes = new ArrayList<>();
// 1. Calculate localChildren size
@@ -2947,8 +2947,8 @@ public class OperatorTreeGenerator extends
PlanVisitor<Operator, LocalExecutionP
throw new IllegalArgumentException("Unknown node type: " +
node.getClass().getName());
}
}
+ context.setExchangeSumNum(finalExchangeNum);
}
- context.setExchangeSumNum(finalExchangeNum);
return parentPipelineChildren;
}
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/PipelineBuilderTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/PipelineBuilderTest.java
index 3841da6405f..2ec572ede00 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/PipelineBuilderTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/PipelineBuilderTest.java
@@ -1007,6 +1007,37 @@ public class PipelineBuilderTest {
assertEquals(2, context.getExchangeSumNum());
}
+ @Test
+ public void testConsumeAllChildrenPipelineBuilderWithExchange() throws
IllegalPathException {
+ TypeProvider typeProvider = new TypeProvider();
+ FullOuterTimeJoinNode fullOuterTimeJoinNode =
+ initFullOuterTimeJoinNodeWithExchangeNode(typeProvider, 3, 3);
+ LocalExecutionPlanContext context =
createLocalExecutionPlanContext(typeProvider);
+ context.setDegreeOfParallelism(1);
+
+ List<Operator> childrenOperator =
+ operatorTreeGenerator.dealWithConsumeAllChildrenPipelineBreaker(
+ fullOuterTimeJoinNode, context);
+ assertEquals(0, context.getPipelineNumber());
+ assertEquals(6, childrenOperator.size());
+ assertEquals(6, fullOuterTimeJoinNode.getChildren().size());
+ for (int i = 0; i < 3; i++) {
+ assertEquals(ExchangeOperator.class, childrenOperator.get(i).getClass());
+ assertEquals(ExchangeNode.class,
fullOuterTimeJoinNode.getChildren().get(i).getClass());
+ }
+
+ for (int i = 3; i < 6; i++) {
+ assertEquals(SeriesScanOperator.class,
childrenOperator.get(i).getClass());
+ assertEquals(SeriesScanNode.class,
fullOuterTimeJoinNode.getChildren().get(i).getClass());
+ assertEquals(
+ String.format("root.sg.d%d.s1", i - 3),
+
fullOuterTimeJoinNode.getChildren().get(i).getOutputColumnNames().get(0));
+ }
+
+ // Validate the number exchange operator
+ assertEquals(3, context.getExchangeSumNum());
+ }
+
private LocalExecutionPlanContext
createLocalExecutionPlanContext(TypeProvider typeProvider) {
ExecutorService instanceNotificationExecutor =
IoTDBThreadPoolFactory.newFixedThreadPool(1,
"test-instance-notification");
@@ -1046,6 +1077,30 @@ public class PipelineBuilderTest {
return fullOuterTimeJoinNode;
}
+ private FullOuterTimeJoinNode initFullOuterTimeJoinNodeWithExchangeNode(
+ TypeProvider typeProvider, int exchangeNum, int scanNum) throws
IllegalPathException {
+ FullOuterTimeJoinNode fullOuterTimeJoinNode =
+ new FullOuterTimeJoinNode(new PlanNodeId("TimeJoinNode"),
Ordering.ASC);
+ for (int i = 0; i < exchangeNum; i++) {
+ ExchangeNode exchangeNode =
+ new ExchangeNode(new
PlanNodeId(String.format("FullOuterTimeJoinWithExchangeNode%d", i)));
+ exchangeNode.setUpstream(
+ new TEndPoint("127.0.0.2", 6667),
+ new FragmentInstanceId(new PlanFragmentId("q", i), "ds"),
+ new PlanNodeId("test"));
+ fullOuterTimeJoinNode.addChild(exchangeNode);
+ }
+ for (int i = 0; i < scanNum; i++) {
+ SeriesScanNode seriesScanNode =
+ new SeriesScanNode(
+ new PlanNodeId(String.format("SeriesScanNode%d", i)),
+ new MeasurementPath(String.format("root.sg.d%d.s1", i),
TSDataType.INT32));
+ typeProvider.setType(seriesScanNode.getSeriesPath().toString(),
TSDataType.INT32);
+ fullOuterTimeJoinNode.addChild(seriesScanNode);
+ }
+ return fullOuterTimeJoinNode;
+ }
+
private LeftOuterTimeJoinNode initLeftOuterTimeJoinNode(TypeProvider
typeProvider)
throws IllegalPathException {
LeftOuterTimeJoinNode leftOuterTimeJoinNode =