This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new c577012169 [IOTDB-4005] Optimize the pipeline build logic for
consumeAllNode
c577012169 is described below
commit c577012169cbed7e5c3a2c7a3f2471e44f8a3841
Author: Xiangwei Wei <[email protected]>
AuthorDate: Wed Feb 15 19:05:11 2023 +0800
[IOTDB-4005] Optimize the pipeline build logic for consumeAllNode
---
.../iotdb/it/env/cluster/MppCommonConfig.java | 6 +
.../it/env/cluster/MppSharedCommonConfig.java | 7 +
.../iotdb/it/env/remote/RemoteCommonConfig.java | 5 +
.../org/apache/iotdb/itbase/env/CommonConfig.java | 2 +
.../it/aggregation/IoTDBAggregationByLevel2IT.java | 40 +++++
.../it/aggregation/IoTDBAggregationByLevelIT.java | 4 +-
.../db/it/alignbydevice/IoTDBAlignByDevice2IT.java | 40 +++++
.../db/it/alignbydevice/IoTDBAlignByDeviceIT.java | 2 +-
.../IoTDBOrderByWithAlignByDevice2IT.java | 40 +++++
.../IoTDBOrderByWithAlignByDeviceIT.java | 2 +-
.../db/it/aligned/IoTDBAlignedSeriesQuery4IT.java | 54 +++++++
.../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 2 +-
.../db/mpp/plan/planner/OperatorTreeGenerator.java | 171 +++++++++++++--------
.../plan/node/process/HorizontallyConcatNode.java | 1 -
.../db/mpp/plan/plan/PipelineBuilderTest.java | 9 +-
15 files changed, 310 insertions(+), 75 deletions(-)
diff --git
a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/MppCommonConfig.java
b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/MppCommonConfig.java
index 37c86f1746..389d0f383a 100644
---
a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/MppCommonConfig.java
+++
b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/MppCommonConfig.java
@@ -317,6 +317,12 @@ public class MppCommonConfig extends MppBaseConfig
implements CommonConfig {
return this;
}
+ @Override
+ public CommonConfig setDegreeOfParallelism(int degreeOfParallelism) {
+ setProperty("degree_of_query_parallelism",
String.valueOf(degreeOfParallelism));
+ return this;
+ }
+
@Override
public CommonConfig setDataRatisTriggerSnapshotThreshold(long threshold) {
setProperty("data_region_ratis_snapshot_trigger_threshold",
String.valueOf(threshold));
diff --git
a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/MppSharedCommonConfig.java
b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/MppSharedCommonConfig.java
index 9ff5b627d1..26e6a1d3ba 100644
---
a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/MppSharedCommonConfig.java
+++
b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/MppSharedCommonConfig.java
@@ -323,6 +323,13 @@ public class MppSharedCommonConfig implements CommonConfig
{
return this;
}
+ @Override
+ public CommonConfig setDegreeOfParallelism(int degreeOfParallelism) {
+ cnConfig.setDegreeOfParallelism(degreeOfParallelism);
+ dnConfig.setDegreeOfParallelism(degreeOfParallelism);
+ return this;
+ }
+
@Override
public CommonConfig setDataRatisTriggerSnapshotThreshold(long threshold) {
cnConfig.setDataRatisTriggerSnapshotThreshold(threshold);
diff --git
a/integration-test/src/main/java/org/apache/iotdb/it/env/remote/RemoteCommonConfig.java
b/integration-test/src/main/java/org/apache/iotdb/it/env/remote/RemoteCommonConfig.java
index 71d6e7642b..9312a0f52d 100644
---
a/integration-test/src/main/java/org/apache/iotdb/it/env/remote/RemoteCommonConfig.java
+++
b/integration-test/src/main/java/org/apache/iotdb/it/env/remote/RemoteCommonConfig.java
@@ -232,6 +232,11 @@ public class RemoteCommonConfig implements CommonConfig {
return this;
}
+ @Override
+ public CommonConfig setDegreeOfParallelism(int degreeOfParallelism) {
+ return this;
+ }
+
@Override
public CommonConfig setDataRatisTriggerSnapshotThreshold(long threshold) {
return this;
diff --git
a/integration-test/src/main/java/org/apache/iotdb/itbase/env/CommonConfig.java
b/integration-test/src/main/java/org/apache/iotdb/itbase/env/CommonConfig.java
index 50ac53bb86..81a50b6ebe 100644
---
a/integration-test/src/main/java/org/apache/iotdb/itbase/env/CommonConfig.java
+++
b/integration-test/src/main/java/org/apache/iotdb/itbase/env/CommonConfig.java
@@ -105,6 +105,8 @@ public interface CommonConfig {
CommonConfig setQueryThreadCount(int queryThreadCount);
+ CommonConfig setDegreeOfParallelism(int degreeOfParallelism);
+
CommonConfig setDataRatisTriggerSnapshotThreshold(long threshold);
CommonConfig setSeriesSlotNum(int seriesSlotNum);
diff --git
a/integration-test/src/test/java/org/apache/iotdb/db/it/aggregation/IoTDBAggregationByLevel2IT.java
b/integration-test/src/test/java/org/apache/iotdb/db/it/aggregation/IoTDBAggregationByLevel2IT.java
new file mode 100644
index 0000000000..83a5086f7e
--- /dev/null
+++
b/integration-test/src/test/java/org/apache/iotdb/db/it/aggregation/IoTDBAggregationByLevel2IT.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.it.aggregation;
+
+import org.apache.iotdb.it.env.EnvFactory;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+
+public class IoTDBAggregationByLevel2IT extends IoTDBAggregationByLevelIT {
+
+ @BeforeClass
+ public static void setUp() throws Exception {
+
EnvFactory.getEnv().getConfig().getCommonConfig().setDegreeOfParallelism(4);
+ EnvFactory.getEnv().initClusterEnvironment();
+ prepareData();
+ }
+
+ @AfterClass
+ public static void tearDown() throws Exception {
+ EnvFactory.getEnv().cleanClusterEnvironment();
+ }
+}
diff --git
a/integration-test/src/test/java/org/apache/iotdb/db/it/aggregation/IoTDBAggregationByLevelIT.java
b/integration-test/src/test/java/org/apache/iotdb/db/it/aggregation/IoTDBAggregationByLevelIT.java
index 97136f6b7e..50aac6cfc1 100644
---
a/integration-test/src/test/java/org/apache/iotdb/db/it/aggregation/IoTDBAggregationByLevelIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/db/it/aggregation/IoTDBAggregationByLevelIT.java
@@ -52,7 +52,7 @@ import static org.junit.Assert.fail;
@Category({LocalStandaloneIT.class, ClusterIT.class})
public class IoTDBAggregationByLevelIT {
- private static final String[] dataSet =
+ protected static final String[] dataSet =
new String[] {
"CREATE DATABASE root.sg1",
"CREATE DATABASE root.sg2",
@@ -695,7 +695,7 @@ public class IoTDBAggregationByLevelIT {
}
}
- private static void prepareData() {
+ protected static void prepareData() {
try (Connection connection = EnvFactory.getEnv().getConnection();
Statement statement = connection.createStatement()) {
diff --git
a/integration-test/src/test/java/org/apache/iotdb/db/it/alignbydevice/IoTDBAlignByDevice2IT.java
b/integration-test/src/test/java/org/apache/iotdb/db/it/alignbydevice/IoTDBAlignByDevice2IT.java
new file mode 100644
index 0000000000..efca7a3f54
--- /dev/null
+++
b/integration-test/src/test/java/org/apache/iotdb/db/it/alignbydevice/IoTDBAlignByDevice2IT.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.it.alignbydevice;
+
+import org.apache.iotdb.it.env.EnvFactory;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+
+public class IoTDBAlignByDevice2IT extends IoTDBAlignByDeviceIT {
+
+ @BeforeClass
+ public static void setUp() throws Exception {
+
EnvFactory.getEnv().getConfig().getCommonConfig().setDegreeOfParallelism(4);
+ EnvFactory.getEnv().initClusterEnvironment();
+ insertData();
+ }
+
+ @AfterClass
+ public static void tearDown() throws Exception {
+ EnvFactory.getEnv().cleanClusterEnvironment();
+ }
+}
diff --git
a/integration-test/src/test/java/org/apache/iotdb/db/it/alignbydevice/IoTDBAlignByDeviceIT.java
b/integration-test/src/test/java/org/apache/iotdb/db/it/alignbydevice/IoTDBAlignByDeviceIT.java
index 9d264203f6..e9fa955cf9 100644
---
a/integration-test/src/test/java/org/apache/iotdb/db/it/alignbydevice/IoTDBAlignByDeviceIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/db/it/alignbydevice/IoTDBAlignByDeviceIT.java
@@ -113,7 +113,7 @@ public class IoTDBAlignByDeviceIT {
EnvFactory.getEnv().cleanClusterEnvironment();
}
- private static void insertData() {
+ protected static void insertData() {
try (Connection connection = EnvFactory.getEnv().getConnection();
Statement statement = connection.createStatement()) {
diff --git
a/integration-test/src/test/java/org/apache/iotdb/db/it/alignbydevice/IoTDBOrderByWithAlignByDevice2IT.java
b/integration-test/src/test/java/org/apache/iotdb/db/it/alignbydevice/IoTDBOrderByWithAlignByDevice2IT.java
new file mode 100644
index 0000000000..519f8e76d9
--- /dev/null
+++
b/integration-test/src/test/java/org/apache/iotdb/db/it/alignbydevice/IoTDBOrderByWithAlignByDevice2IT.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.it.alignbydevice;
+
+import org.apache.iotdb.it.env.EnvFactory;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+
+public class IoTDBOrderByWithAlignByDevice2IT extends
IoTDBOrderByWithAlignByDeviceIT {
+
+ @BeforeClass
+ public static void setUp() throws Exception {
+
EnvFactory.getEnv().getConfig().getCommonConfig().setDegreeOfParallelism(4);
+ EnvFactory.getEnv().initClusterEnvironment();
+ insertData();
+ }
+
+ @AfterClass
+ public static void tearDown() throws Exception {
+ EnvFactory.getEnv().cleanClusterEnvironment();
+ }
+}
diff --git
a/integration-test/src/test/java/org/apache/iotdb/db/it/alignbydevice/IoTDBOrderByWithAlignByDeviceIT.java
b/integration-test/src/test/java/org/apache/iotdb/db/it/alignbydevice/IoTDBOrderByWithAlignByDeviceIT.java
index a6e766475b..fba9177148 100644
---
a/integration-test/src/test/java/org/apache/iotdb/db/it/alignbydevice/IoTDBOrderByWithAlignByDeviceIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/db/it/alignbydevice/IoTDBOrderByWithAlignByDeviceIT.java
@@ -89,7 +89,7 @@ public class IoTDBOrderByWithAlignByDeviceIT {
*
*
<p>https://docs.google.com/spreadsheets/d/18XlOIi27ZIIdRnar2WNXVMxkZwjgwlPZmzJLVpZRpAA/edit#gid=0
*/
- private static void insertData() {
+ protected static void insertData() {
try (Connection iotDBConnection = EnvFactory.getEnv().getConnection();
Statement statement = iotDBConnection.createStatement()) {
// create TimeSeries
diff --git
a/integration-test/src/test/java/org/apache/iotdb/db/it/aligned/IoTDBAlignedSeriesQuery4IT.java
b/integration-test/src/test/java/org/apache/iotdb/db/it/aligned/IoTDBAlignedSeriesQuery4IT.java
new file mode 100644
index 0000000000..6c3104d0aa
--- /dev/null
+++
b/integration-test/src/test/java/org/apache/iotdb/db/it/aligned/IoTDBAlignedSeriesQuery4IT.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.it.aligned;
+
+import org.apache.iotdb.db.it.utils.AlignedWriteUtil;
+import org.apache.iotdb.it.env.EnvFactory;
+import org.apache.iotdb.it.framework.IoTDBTestRunner;
+import org.apache.iotdb.itbase.category.ClusterIT;
+import org.apache.iotdb.itbase.category.LocalStandaloneIT;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+
+@RunWith(IoTDBTestRunner.class)
+@Category({LocalStandaloneIT.class, ClusterIT.class})
+public class IoTDBAlignedSeriesQuery4IT extends IoTDBAlignedSeriesQueryIT {
+
+ @BeforeClass
+ public static void setUp() throws Exception {
+ EnvFactory.getEnv()
+ .getConfig()
+ .getCommonConfig()
+ .setEnableSeqSpaceCompaction(false)
+ .setEnableUnseqSpaceCompaction(false)
+ .setEnableCrossSpaceCompaction(false)
+ .setMaxTsBlockLineNumber(3)
+ .setDegreeOfParallelism(4);
+ EnvFactory.getEnv().initClusterEnvironment();
+ AlignedWriteUtil.insertData();
+ }
+
+ @AfterClass
+ public static void tearDown() throws Exception {
+ EnvFactory.getEnv().cleanClusterEnvironment();
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index 77aadfda46..749c06efd3 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -318,7 +318,7 @@ public class IoTDBConfig {
/** How many threads can concurrently execute query statement. When <= 0,
use CPU core number. */
private int queryThreadCount = Runtime.getRuntime().availableProcessors();
- private int degreeOfParallelism = Runtime.getRuntime().availableProcessors()
/ 2;
+ private int degreeOfParallelism = Math.max(1,
Runtime.getRuntime().availableProcessors() / 2);
/** How many queries can be concurrently executed. When <= 0, use 1000. */
private int maxAllowedConcurrentQueries = 1000;
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java
index 89880c5ca1..49706b4ebe 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java
@@ -2258,77 +2258,86 @@ public class OperatorTreeGenerator extends
PlanVisitor<Operator, LocalExecutionP
// Keep it since we may change the structure of origin children nodes
List<PlanNode> afterwardsNodes = new ArrayList<>();
// 1. Calculate localChildren size
- int localChildrenSize = 0;
- for (PlanNode child : node.getChildren()) {
- if (!(child instanceof ExchangeNode)) {
+ int localChildrenSize = 0, firstChildIndex = -1;
+ for (int i = 0; i < node.getChildren().size(); i++) {
+ if (!(node.getChildren().get(i) instanceof ExchangeNode)) {
localChildrenSize++;
+ firstChildIndex = firstChildIndex == -1 ? i : firstChildIndex;
+ // deal with exchangeNode at head
+ } else if (firstChildIndex == -1) {
+ Operator childOperation = node.getChildren().get(i).accept(this,
context);
+ finalExchangeNum += 1;
+ parentPipelineChildren.add(childOperation);
+ afterwardsNodes.add(node.getChildren().get(i));
}
}
- // 2. divide every childNumInEachPipeline localChildren to different
pipeline
- int[] childNumInEachPipeline =
- getChildNumInEachPipeline(
- node.getChildren(), localChildrenSize,
context.getDegreeOfParallelism());
- // If dop > size(children) + 1, we can allocate extra dop to child node
- // Extra dop = dop - size(children), since dop = 1 means serial but not 0
- int childGroupNum = Math.min(context.getDegreeOfParallelism(),
localChildrenSize);
+ if (firstChildIndex == -1) {
+ context.setExchangeSumNum(finalExchangeNum);
+ return parentPipelineChildren;
+ }
+ // If dop > localChildrenSize + 1, we can allocate extra dop to child
node
+ // Extra dop = dop - localChildrenSize, since dop = 1 means serial but
not 0
int dopForChild = Math.max(1, context.getDegreeOfParallelism() -
localChildrenSize);
- int startIndex, endIndex = 0;
- for (int i = 0; i < childGroupNum; i++) {
- startIndex = endIndex;
- endIndex += childNumInEachPipeline[i];
- // Only if dop >= size(children) + 1, split all children to new
pipeline
- // Otherwise, the first group will belong to the parent pipeline
- if (i == 0 && context.getDegreeOfParallelism() < localChildrenSize +
1) {
- for (int j = startIndex; j < endIndex; j++) {
- Operator childOperation = node.getChildren().get(j).accept(this,
context);
+ // If dop > localChildrenSize, we create one new pipeline for each child
+ if (context.getDegreeOfParallelism() > localChildrenSize) {
+ for (int i = firstChildIndex; i < node.getChildren().size(); i++) {
+ PlanNode childNode = node.getChildren().get(i);
+ if (childNode instanceof ExchangeNode) {
+ Operator childOperation = childNode.accept(this, context);
+ finalExchangeNum += 1;
parentPipelineChildren.add(childOperation);
- afterwardsNodes.add(node.getChildren().get(j));
+ } else {
+ LocalExecutionPlanContext subContext = context.createSubContext();
+ subContext.setDegreeOfParallelism(dopForChild);
+
+ int originPipeNum = context.getPipelineNumber();
+ Operator sourceOperator = createNewPipelineForChildNode(context,
subContext, childNode);
+ parentPipelineChildren.add(sourceOperator);
+ dopForChild =
+ Math.max(1, dopForChild - (subContext.getPipelineNumber() - 1
- originPipeNum));
+ finalExchangeNum += subContext.getExchangeSumNum() -
context.getExchangeSumNum() + 1;
}
- continue;
}
- LocalExecutionPlanContext subContext = context.createSubContext();
- subContext.setDegreeOfParallelism(dopForChild);
- // Create partial parent operator for children
- PlanNode partialParentNode = null;
- Operator partialParentOperator = null;
-
- int originPipeNum = context.getPipelineNumber();
- if (endIndex - startIndex == 1) {
- partialParentNode = node.getChildren().get(i);
- partialParentOperator = node.getChildren().get(i).accept(this,
subContext);
- } else {
- // PartialParentNode is equals to parentNode except children
- partialParentNode = node.createSubNode(i, startIndex, endIndex);
- partialParentOperator = partialParentNode.accept(this, subContext);
+ } else {
+ // If dop <= localChildrenSize, we have to divide every
childNumInEachPipeline localChildren
+ // to different pipeline
+ int[] childNumInEachPipeline =
+ getChildNumInEachPipeline(
+ node.getChildren(), localChildrenSize,
context.getDegreeOfParallelism());
+ int childGroupNum = Math.min(context.getDegreeOfParallelism(),
localChildrenSize);
+ int startIndex, endIndex = firstChildIndex;
+ for (int i = 0; i < childGroupNum; i++) {
+ startIndex = endIndex;
+ endIndex += childNumInEachPipeline[i];
+ // Only if dop >= size(children) + 1, split all children to new
pipeline
+ // Otherwise, the first group will belong to the parent pipeline
+ if (i == 0) {
+ for (int j = startIndex; j < endIndex; j++) {
+ Operator childOperation = node.getChildren().get(j).accept(this,
context);
+ parentPipelineChildren.add(childOperation);
+ afterwardsNodes.add(node.getChildren().get(j));
+ }
+ continue;
+ }
+ LocalExecutionPlanContext subContext = context.createSubContext();
+ subContext.setDegreeOfParallelism(1);
+ // Create partial parent operator for children
+ PlanNode partialParentNode = null;
+ if (endIndex - startIndex == 1) {
+ partialParentNode = node.getChildren().get(startIndex);
+ } else {
+ // PartialParentNode is equals to parentNode except children
+ partialParentNode = node.createSubNode(i, startIndex, endIndex);
+ }
+
+ Operator sourceOperator =
+ createNewPipelineForChildNode(context, subContext,
partialParentNode);
+ parentPipelineChildren.add(sourceOperator);
+ afterwardsNodes.add(partialParentNode);
+ finalExchangeNum += subContext.getExchangeSumNum() -
context.getExchangeSumNum() + 1;
}
- // update dop for child
- dopForChild = Math.max(1, dopForChild -
(subContext.getPipelineNumber() - originPipeNum));
- ISinkHandle localSinkHandle =
- MPP_DATA_EXCHANGE_MANAGER.createLocalSinkHandleForPipeline(
- // Attention, there is no parent node, use first child node
instead
- subContext.getDriverContext(),
node.getChildren().get(i).getPlanNodeId().getId());
- subContext.setSinkHandle(localSinkHandle);
- subContext.addPipelineDriverFactory(partialParentOperator,
subContext.getDriverContext());
-
- ExchangeOperator sourceOperator =
- new ExchangeOperator(
- context
- .getDriverContext()
- .addOperatorContext(
- context.getNextOperatorId(), null,
ExchangeOperator.class.getSimpleName()),
- MPP_DATA_EXCHANGE_MANAGER.createLocalSourceHandleForPipeline(
- ((LocalSinkHandle)
localSinkHandle).getSharedTsBlockQueue(),
- context.getDriverContext()),
- partialParentNode.getPlanNodeId());
- context
- .getTimeSliceAllocator()
- .recordExecutionWeight(sourceOperator.getOperatorContext(), 1);
- parentPipelineChildren.add(sourceOperator);
- afterwardsNodes.add(partialParentNode);
- context.addExchangeOperator(sourceOperator);
- finalExchangeNum += subContext.getExchangeSumNum() -
context.getExchangeSumNum() + 1;
+ ((MultiChildProcessNode) node).setChildren(afterwardsNodes);
}
- ((MultiChildProcessNode) node).setChildren(afterwardsNodes);
}
context.setExchangeSumNum(finalExchangeNum);
return parentPipelineChildren;
@@ -2340,6 +2349,8 @@ public class OperatorTreeGenerator extends
PlanVisitor<Operator, LocalExecutionP
* operator, maybe we can allocate based on workload rather than child
number.
*
* <p>If child is ExchangeNode, it won't affect the children number of
current group.
+ *
+ * <p>This method can only be invoked when dop <= localChildrenSize.
*/
public int[] getChildNumInEachPipeline(
List<PlanNode> allChildren, int localChildrenSize, int dop) {
@@ -2347,9 +2358,13 @@ public class OperatorTreeGenerator extends
PlanVisitor<Operator, LocalExecutionP
int[] childNumInEachPipeline = new int[maxPipelineNum];
int avgChildNum = Math.max(1, localChildrenSize / dop);
// allocate remaining child to group from splitIndex
- int splitIndex =
- localChildrenSize <= dop ? maxPipelineNum : maxPipelineNum -
localChildrenSize % dop;
- int pipelineIndex = 0, childIndex = 0;
+ int splitIndex = maxPipelineNum - localChildrenSize % dop;
+ int childIndex = 0;
+ // Skip ExchangeNode at head
+ while (childIndex < allChildren.size() && allChildren.get(childIndex)
instanceof ExchangeNode) {
+ childIndex++;
+ }
+ int pipelineIndex = 0;
while (pipelineIndex < maxPipelineNum) {
int childNum = pipelineIndex < splitIndex ? avgChildNum : avgChildNum +
1;
int originChildIndex = childIndex;
@@ -2368,6 +2383,32 @@ public class OperatorTreeGenerator extends
PlanVisitor<Operator, LocalExecutionP
return childNumInEachPipeline;
}
+ private Operator createNewPipelineForChildNode(
+ LocalExecutionPlanContext context, LocalExecutionPlanContext subContext,
PlanNode childNode) {
+ Operator childOperation = childNode.accept(this, subContext);
+ ISinkHandle localSinkHandle =
+ MPP_DATA_EXCHANGE_MANAGER.createLocalSinkHandleForPipeline(
+ // Attention, there is no parent node, use first child node instead
+ subContext.getDriverContext(), childNode.getPlanNodeId().getId());
+ subContext.setSinkHandle(localSinkHandle);
+ subContext.addPipelineDriverFactory(childOperation,
subContext.getDriverContext());
+
+ ExchangeOperator sourceOperator =
+ new ExchangeOperator(
+ context
+ .getDriverContext()
+ .addOperatorContext(
+ context.getNextOperatorId(), null,
ExchangeOperator.class.getSimpleName()),
+ MPP_DATA_EXCHANGE_MANAGER.createLocalSourceHandleForPipeline(
+ ((LocalSinkHandle) localSinkHandle).getSharedTsBlockQueue(),
+ context.getDriverContext()),
+ childNode.getPlanNodeId());
+
+
context.getTimeSliceAllocator().recordExecutionWeight(sourceOperator.getOperatorContext(),
1);
+ context.addExchangeOperator(sourceOperator);
+ return sourceOperator;
+ }
+
public List<Operator> dealWithConsumeChildrenOneByOneNode(
PlanNode node, LocalExecutionPlanContext context) {
List<Operator> parentPipelineChildren = new ArrayList<>();
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/HorizontallyConcatNode.java
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/HorizontallyConcatNode.java
index f6c785f8fc..00d1b3c4f9 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/HorizontallyConcatNode.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/HorizontallyConcatNode.java
@@ -63,7 +63,6 @@ public class HorizontallyConcatNode extends
MultiChildProcessNode {
return children.stream()
.map(PlanNode::getOutputColumnNames)
.flatMap(List::stream)
- .distinct()
.collect(Collectors.toList());
}
diff --git
a/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/PipelineBuilderTest.java
b/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/PipelineBuilderTest.java
index c0ee46142b..d3db6d7811 100644
---
a/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/PipelineBuilderTest.java
+++
b/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/PipelineBuilderTest.java
@@ -679,22 +679,23 @@ public class PipelineBuilderTest {
@Test
public void testGetChildNumInEachPipeline() {
List<PlanNode> allChildren = new ArrayList<>();
- allChildren.add(new SeriesScanNode(new PlanNodeId("localNode1"), null));
allChildren.add(new ExchangeNode(new PlanNodeId("remoteNode1")));
+ allChildren.add(new SeriesScanNode(new PlanNodeId("localNode1"), null));
+ allChildren.add(new ExchangeNode(new PlanNodeId("remoteNode2")));
allChildren.add(new SeriesScanNode(new PlanNodeId("localNode2"), null));
int[] childNumInEachPipeline =
- operatorTreeGenerator.getChildNumInEachPipeline(allChildren, 2, 3);
+ operatorTreeGenerator.getChildNumInEachPipeline(allChildren, 2, 2);
assertEquals(2, childNumInEachPipeline.length);
assertEquals(2, childNumInEachPipeline[0]);
assertEquals(1, childNumInEachPipeline[1]);
allChildren.add(new SeriesScanNode(new PlanNodeId("localNode3"), null));
allChildren.add(new SeriesScanNode(new PlanNodeId("localNode4"), null));
- allChildren.add(new ExchangeNode(new PlanNodeId("remoteNode2")));
allChildren.add(new ExchangeNode(new PlanNodeId("remoteNode3")));
- allChildren.add(new SeriesScanNode(new PlanNodeId("localNode5"), null));
allChildren.add(new ExchangeNode(new PlanNodeId("remoteNode4")));
+ allChildren.add(new SeriesScanNode(new PlanNodeId("localNode5"), null));
+ allChildren.add(new ExchangeNode(new PlanNodeId("remoteNode5")));
childNumInEachPipeline =
operatorTreeGenerator.getChildNumInEachPipeline(allChildren, 5, 3);
assertEquals(3, childNumInEachPipeline.length);
assertEquals(2, childNumInEachPipeline[0]);