This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 1b9c9eff30 [IOTDB-5716] Wrong dependency when pipeline 
consumeOneByOneOperator
1b9c9eff30 is described below

commit 1b9c9eff300f657e2098545685c2d73277e72f4d
Author: Xiangwei Wei <[email protected]>
AuthorDate: Thu Mar 23 10:29:16 2023 +0800

    [IOTDB-5716] Wrong dependency when pipeline consumeOneByOneOperator
---
 .../db/mpp/execution/schedule/DriverScheduler.java |  2 +-
 .../db/mpp/plan/planner/OperatorTreeGenerator.java |  3 +-
 .../db/mpp/plan/plan/PipelineBuilderTest.java      | 96 +++++++++++++++++++++-
 3 files changed, 98 insertions(+), 3 deletions(-)

diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/DriverScheduler.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/DriverScheduler.java
index 5179a57b3a..6cd893f441 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/DriverScheduler.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/DriverScheduler.java
@@ -311,7 +311,6 @@ public class DriverScheduler implements IDriverScheduler, 
IService {
           readyQueue.decreaseReservedSize();
           break;
         case FINISHED:
-          readyQueue.decreaseReservedSize();
           break;
       }
 
@@ -482,6 +481,7 @@ public class DriverScheduler implements IDriverScheduler, 
IService {
         }
         task.updateSchedulePriority(context);
         task.setStatus(DriverTaskStatus.FINISHED);
+        readyQueue.decreaseReservedSize();
       } finally {
         task.unlock();
       }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java
index 0e556f45bb..1083fafcfc 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java
@@ -2587,8 +2587,9 @@ public class OperatorTreeGenerator extends 
PlanVisitor<Operator, LocalExecutionP
           if (sumOfChildPipelines > dopForChild) {
             // Update dependencyPipeId, after which finishes we can submit 
curChildPipeline
             while (sumOfChildPipelines > dopForChild) {
-              dependencyPipeId = context.getPipelineNumber() - 
sumOfChildPipelines;
               sumOfChildPipelines -= 
childPipelineNums.get(dependencyChildNode);
+              // The dependency pipeline must be a parent pipeline rather than 
a child pipeline
+              dependencyPipeId = context.getPipelineNumber() - 
sumOfChildPipelines - 1;
               sumOfChildExchangeNums -= 
childExchangeNums.get(dependencyChildNode);
               dependencyChildNode++;
             }
diff --git 
a/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/PipelineBuilderTest.java
 
b/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/PipelineBuilderTest.java
index d3db6d7811..56263126ef 100644
--- 
a/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/PipelineBuilderTest.java
+++ 
b/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/PipelineBuilderTest.java
@@ -19,10 +19,13 @@
 
 package org.apache.iotdb.db.mpp.plan.plan;
 
+import org.apache.iotdb.common.rpc.thrift.TAggregationType;
+import org.apache.iotdb.common.rpc.thrift.TEndPoint;
 import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
 import org.apache.iotdb.commons.exception.IllegalPathException;
 import org.apache.iotdb.commons.path.AlignedPath;
 import org.apache.iotdb.commons.path.MeasurementPath;
+import org.apache.iotdb.commons.path.PartialPath;
 import org.apache.iotdb.db.engine.storagegroup.DataRegion;
 import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
 import org.apache.iotdb.db.mpp.common.PlanFragmentId;
@@ -34,15 +37,20 @@ import 
org.apache.iotdb.db.mpp.execution.operator.source.AlignedSeriesScanOperat
 import org.apache.iotdb.db.mpp.execution.operator.source.ExchangeOperator;
 import org.apache.iotdb.db.mpp.execution.operator.source.SeriesScanOperator;
 import org.apache.iotdb.db.mpp.plan.analyze.TypeProvider;
+import org.apache.iotdb.db.mpp.plan.expression.leaf.TimeSeriesOperand;
 import org.apache.iotdb.db.mpp.plan.planner.LocalExecutionPlanContext;
 import org.apache.iotdb.db.mpp.plan.planner.OperatorTreeGenerator;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.AggregationNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.DeviceViewNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.ExchangeNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.TimeJoinNode;
 import 
org.apache.iotdb.db.mpp.plan.planner.plan.node.source.AlignedSeriesScanNode;
+import 
org.apache.iotdb.db.mpp.plan.planner.plan.node.source.SeriesAggregationScanNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.SeriesScanNode;
+import 
org.apache.iotdb.db.mpp.plan.planner.plan.parameter.AggregationDescriptor;
+import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.AggregationStep;
 import org.apache.iotdb.db.mpp.plan.statement.component.Ordering;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 
@@ -50,6 +58,7 @@ import org.junit.Test;
 import org.mockito.Mockito;
 
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.ExecutorService;
 
@@ -621,7 +630,7 @@ public class PipelineBuilderTest {
   }
 
   /**
-   * This test will test dop = 5. Expected result is five pipelines without 
dependency:
+   * This test will test dop = 6. Expected result is five pipelines without 
dependency:
    *
    * <p>The first is: DeviceView - [ExchangeOperator, ExchangeOperator, 
ExchangeOperator,
    * ExchangeOperator];
@@ -676,6 +685,91 @@ public class PipelineBuilderTest {
     assertEquals(4, context.getExchangeSumNum());
   }
 
+  /**
+   * The operator structure is:
+   *
+   * <p>DeviceViewOperator - [AggregationOperator1 - 
[SeriesAggregationScanOperator1,
+   * ExchangeOperator1]], [AggregationOperator2 - 
[SeriesAggregationScanOperator2,
+   * ExchangeOperator2]].
+   *
+   * <p>This test will test dop = 3. Expected result is five pipelines with 
dependency:
+   *
+   * <p>The pipeline0 is: ExchangeOperator - SeriesAggregationScanOperator1.
+   *
+   * <p>The pipeline1 is: ExchangeOperator - AggregationOperator1.
+   *
+   * <p>The pipeline2 is: ExchangeOperator - SeriesAggregationScanOperator2, 
which has dependency 1.
+   *
+   * <p>The pipeline3 is: ExchangeOperator - AggregationOperator2, which has 
dependency1.
+   *
+   * <p>The pipeline4 is: DeviceView - [ExchangeOperator, ExchangeOperator]
+   */
+  @Test
+  public void testConsumeOneByOneChildrenPipelineBuilderDependency() throws 
IllegalPathException {
+    TypeProvider typeProvider = new TypeProvider();
+    typeProvider.setType("root.sg.d0.s1", TSDataType.INT64);
+    typeProvider.setType("root.sg.d1.s1", TSDataType.INT64);
+    typeProvider.setType("count(root.sg.d0.s1)", TSDataType.INT64);
+    typeProvider.setType("count(root.sg.d1.s1)", TSDataType.INT64);
+    DeviceViewNode deviceViewNode =
+        new DeviceViewNode(new PlanNodeId("DeviceViewNode"), null, null, null);
+    for (int i = 0; i < 2; i++) {
+      PartialPath path = new MeasurementPath(String.format("root.sg.d%d.s1", 
i), TSDataType.INT64);
+      AggregationNode aggregationNode =
+          new AggregationNode(
+              new PlanNodeId(String.format("AggregationOperator%d", i)),
+              Collections.singletonList(
+                  new AggregationDescriptor(
+                      TAggregationType.COUNT.name().toLowerCase(),
+                      AggregationStep.FINAL,
+                      Collections.singletonList(new TimeSeriesOperand(path)))),
+              null,
+              Ordering.ASC);
+      SeriesAggregationScanNode seriesAggregationScanNode =
+          new SeriesAggregationScanNode(
+              new PlanNodeId(String.format("seriesAggregationScanNode%d", i)),
+              (MeasurementPath) path,
+              Collections.singletonList(
+                  new AggregationDescriptor(
+                      TAggregationType.COUNT.name().toLowerCase(),
+                      AggregationStep.PARTIAL,
+                      Collections.singletonList(new 
TimeSeriesOperand(path)))));
+      ExchangeNode exchangeNode =
+          new ExchangeNode(new PlanNodeId(String.format("ExchangeNode%d", i)));
+      exchangeNode.setUpstream(
+          new TEndPoint("127.0.0.1", 6667),
+          new FragmentInstanceId(new PlanFragmentId("q", 1), "ds"),
+          new PlanNodeId("test"));
+      aggregationNode.addChild(seriesAggregationScanNode);
+      aggregationNode.addChild(exchangeNode);
+      deviceViewNode.addChild(aggregationNode);
+    }
+    LocalExecutionPlanContext context = 
createLocalExecutionPlanContext(typeProvider);
+    context.setDegreeOfParallelism(3);
+
+    List<Operator> childrenOperator =
+        
operatorTreeGenerator.dealWithConsumeChildrenOneByOneNode(deviceViewNode, 
context);
+    // The number of pipeline is 4, since parent pipeline hasn't joined
+    assertEquals(4, context.getPipelineNumber());
+
+    assertEquals(2, childrenOperator.size());
+    for (int i = 0; i < 2; i++) {
+      assertEquals(ExchangeOperator.class, childrenOperator.get(i).getClass());
+    }
+
+    // Validate the first pipeline
+    assertEquals(-1, 
context.getPipelineDriverFactories().get(0).getDependencyPipelineIndex());
+
+    // Validate the second pipeline
+    assertEquals(-1, 
context.getPipelineDriverFactories().get(1).getDependencyPipelineIndex());
+
+    // Validate the third pipeline
+    assertEquals(1, 
context.getPipelineDriverFactories().get(2).getDependencyPipelineIndex());
+
+    // Validate the forth pipeline
+    assertEquals(1, 
context.getPipelineDriverFactories().get(2).getDependencyPipelineIndex());
+  }
+
   @Test
   public void testGetChildNumInEachPipeline() {
     List<PlanNode> allChildren = new ArrayList<>();

Reply via email to