This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new d2a71be993f Fix wrong calculation of ExchangeNum for 
consumeAllPipelineBreaker when dop = 1
d2a71be993f is described below

commit d2a71be993f68d8208b2c370d26b02ed711595a3
Author: Liao Lanyu <[email protected]>
AuthorDate: Tue Jan 16 20:17:05 2024 +0800

    Fix wrong calculation of ExchangeNum for consumeAllPipelineBreaker when dop 
= 1
---
 .../plan/planner/OperatorTreeGenerator.java        |  4 +-
 .../plan/planner/PipelineBuilderTest.java          | 55 ++++++++++++++++++++++
 2 files changed, 57 insertions(+), 2 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/OperatorTreeGenerator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/OperatorTreeGenerator.java
index 094a543a916..cbf8db730de 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/OperatorTreeGenerator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/OperatorTreeGenerator.java
@@ -2844,7 +2844,6 @@ public class OperatorTreeGenerator extends 
PlanVisitor<Operator, LocalExecutionP
       PlanNode node, LocalExecutionPlanContext context) {
     // children after pipelining
     List<Operator> parentPipelineChildren = new ArrayList<>();
-    int finalExchangeNum = context.getExchangeSumNum();
     if (context.getDegreeOfParallelism() == 1 || node.getChildren().size() == 
1) {
       // If dop = 1, we don't create extra pipeline
       for (PlanNode localChild : node.getChildren()) {
@@ -2852,6 +2851,7 @@ public class OperatorTreeGenerator extends 
PlanVisitor<Operator, LocalExecutionP
         parentPipelineChildren.add(childOperation);
       }
     } else {
+      int finalExchangeNum = context.getExchangeSumNum();
       // Keep it since we may change the structure of origin children nodes
       List<PlanNode> afterwardsNodes = new ArrayList<>();
       // 1. Calculate localChildren size
@@ -2947,8 +2947,8 @@ public class OperatorTreeGenerator extends 
PlanVisitor<Operator, LocalExecutionP
           throw new IllegalArgumentException("Unknown node type: " + 
node.getClass().getName());
         }
       }
+      context.setExchangeSumNum(finalExchangeNum);
     }
-    context.setExchangeSumNum(finalExchangeNum);
     return parentPipelineChildren;
   }
 
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/PipelineBuilderTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/PipelineBuilderTest.java
index 3841da6405f..2ec572ede00 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/PipelineBuilderTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/PipelineBuilderTest.java
@@ -1007,6 +1007,37 @@ public class PipelineBuilderTest {
     assertEquals(2, context.getExchangeSumNum());
   }
 
+  @Test
+  public void testConsumeAllChildrenPipelineBuilderWithExchange() throws 
IllegalPathException {
+    TypeProvider typeProvider = new TypeProvider();
+    FullOuterTimeJoinNode fullOuterTimeJoinNode =
+        initFullOuterTimeJoinNodeWithExchangeNode(typeProvider, 3, 3);
+    LocalExecutionPlanContext context = 
createLocalExecutionPlanContext(typeProvider);
+    context.setDegreeOfParallelism(1);
+
+    List<Operator> childrenOperator =
+        operatorTreeGenerator.dealWithConsumeAllChildrenPipelineBreaker(
+            fullOuterTimeJoinNode, context);
+    assertEquals(0, context.getPipelineNumber());
+    assertEquals(6, childrenOperator.size());
+    assertEquals(6, fullOuterTimeJoinNode.getChildren().size());
+    for (int i = 0; i < 3; i++) {
+      assertEquals(ExchangeOperator.class, childrenOperator.get(i).getClass());
+      assertEquals(ExchangeNode.class, 
fullOuterTimeJoinNode.getChildren().get(i).getClass());
+    }
+
+    for (int i = 3; i < 6; i++) {
+      assertEquals(SeriesScanOperator.class, 
childrenOperator.get(i).getClass());
+      assertEquals(SeriesScanNode.class, 
fullOuterTimeJoinNode.getChildren().get(i).getClass());
+      assertEquals(
+          String.format("root.sg.d%d.s1", i - 3),
+          
fullOuterTimeJoinNode.getChildren().get(i).getOutputColumnNames().get(0));
+    }
+
+    // Validate the number exchange operator
+    assertEquals(3, context.getExchangeSumNum());
+  }
+
   private LocalExecutionPlanContext 
createLocalExecutionPlanContext(TypeProvider typeProvider) {
     ExecutorService instanceNotificationExecutor =
         IoTDBThreadPoolFactory.newFixedThreadPool(1, 
"test-instance-notification");
@@ -1046,6 +1077,30 @@ public class PipelineBuilderTest {
     return fullOuterTimeJoinNode;
   }
 
+  private FullOuterTimeJoinNode initFullOuterTimeJoinNodeWithExchangeNode(
+      TypeProvider typeProvider, int exchangeNum, int scanNum) throws 
IllegalPathException {
+    FullOuterTimeJoinNode fullOuterTimeJoinNode =
+        new FullOuterTimeJoinNode(new PlanNodeId("TimeJoinNode"), 
Ordering.ASC);
+    for (int i = 0; i < exchangeNum; i++) {
+      ExchangeNode exchangeNode =
+          new ExchangeNode(new 
PlanNodeId(String.format("FullOuterTimeJoinWithExchangeNode%d", i)));
+      exchangeNode.setUpstream(
+          new TEndPoint("127.0.0.2", 6667),
+          new FragmentInstanceId(new PlanFragmentId("q", i), "ds"),
+          new PlanNodeId("test"));
+      fullOuterTimeJoinNode.addChild(exchangeNode);
+    }
+    for (int i = 0; i < scanNum; i++) {
+      SeriesScanNode seriesScanNode =
+          new SeriesScanNode(
+              new PlanNodeId(String.format("SeriesScanNode%d", i)),
+              new MeasurementPath(String.format("root.sg.d%d.s1", i), 
TSDataType.INT32));
+      typeProvider.setType(seriesScanNode.getSeriesPath().toString(), 
TSDataType.INT32);
+      fullOuterTimeJoinNode.addChild(seriesScanNode);
+    }
+    return fullOuterTimeJoinNode;
+  }
+
   private LeftOuterTimeJoinNode initLeftOuterTimeJoinNode(TypeProvider 
typeProvider)
       throws IllegalPathException {
     LeftOuterTimeJoinNode leftOuterTimeJoinNode =

Reply via email to