This is an automated email from the ASF dual-hosted git repository.
caogaofei pushed a commit to branch rc/1.3.1
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/rc/1.3.1 by this push:
new 9e8688183ea [IOTDB-6297] Optimize the distribute plan in aggregation
align by device when some device cross data regions
9e8688183ea is described below
commit 9e8688183ea28ecf50f1e42a87476733403ac5b3
Author: Beyyes <[email protected]>
AuthorDate: Fri Mar 8 11:21:34 2024 +0800
[IOTDB-6297] Optimize the distribute plan in aggregation align by device
when some device cross data regions
---
.../db/it/alignbydevice/IoTDBAlignByDevice3IT.java | 43 ++
.../IoTDBAlignByDeviceWithTemplate2IT.java | 43 ++
.../IoTDBAlignByDeviceWithTemplateIT.java | 2 +-
.../IoTDBOrderByLimitOffsetAlignByDevice2IT.java | 46 +++
.../IoTDBOrderByWithAlignByDevice3IT.java | 40 ++
.../db/it/alignbydevice/IoTDBShuffleSink1IT.java | 1 +
.../db/it/alignbydevice/IoTDBShuffleSink2IT.java | 1 +
.../execution/aggregation/Accumulator.java | 9 +
.../execution/aggregation/AvgAccumulator.java | 5 +
.../aggregation/FirstValueAccumulator.java | 5 +
.../aggregation/LastValueAccumulator.java | 5 +
.../aggregation/TimeDurationAccumulator.java | 5 +
.../process/AggregationMergeSortOperator.java | 286 +++++++++++++
.../db/queryengine/plan/analyze/Analysis.java | 10 -
.../plan/planner/OperatorTreeGenerator.java | 74 +++-
.../plan/planner/distribution/SourceRewriter.java | 196 +++++++--
.../node/process/AggregationMergeSortNode.java | 56 ++-
.../planner/plan/node/process/DeviceViewNode.java | 6 +-
.../plan/parameter/AggregationDescriptor.java | 8 +-
.../distribution/AggregationDistributionTest.java | 26 +-
.../AlignByDeviceOrderByLimitOffsetTest.java | 123 ++++--
.../planner/distribution/AlignedByDeviceTest.java | 446 ++++-----------------
.../datanode1conf/iotdb-common.properties | 1 -
.../datanode3conf/iotdb-common.properties | 1 -
24 files changed, 983 insertions(+), 455 deletions(-)
diff --git
a/integration-test/src/test/java/org/apache/iotdb/db/it/alignbydevice/IoTDBAlignByDevice3IT.java
b/integration-test/src/test/java/org/apache/iotdb/db/it/alignbydevice/IoTDBAlignByDevice3IT.java
new file mode 100644
index 00000000000..68b2c40cc6b
--- /dev/null
+++
b/integration-test/src/test/java/org/apache/iotdb/db/it/alignbydevice/IoTDBAlignByDevice3IT.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.it.alignbydevice;
+
+import org.apache.iotdb.it.env.EnvFactory;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+
+/**
+ * Change series_slot_num to 1, to generate more devices which are cross data
regions as possible.
+ */
+public class IoTDBAlignByDevice3IT extends IoTDBAlignByDeviceIT {
+
+ @BeforeClass
+ public static void setUp() throws Exception {
+ EnvFactory.getEnv().getConfig().getCommonConfig().setSeriesSlotNum(1);
+ EnvFactory.getEnv().initClusterEnvironment();
+ insertData();
+ }
+
+ @AfterClass
+ public static void tearDown() throws Exception {
+ EnvFactory.getEnv().cleanClusterEnvironment();
+ }
+}
diff --git
a/integration-test/src/test/java/org/apache/iotdb/db/it/alignbydevice/IoTDBAlignByDeviceWithTemplate2IT.java
b/integration-test/src/test/java/org/apache/iotdb/db/it/alignbydevice/IoTDBAlignByDeviceWithTemplate2IT.java
new file mode 100644
index 00000000000..a4da0898f75
--- /dev/null
+++
b/integration-test/src/test/java/org/apache/iotdb/db/it/alignbydevice/IoTDBAlignByDeviceWithTemplate2IT.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.it.alignbydevice;
+
+import org.apache.iotdb.it.env.EnvFactory;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+
+/**
+ * Change series_slot_num to 1, to generate more devices which are cross data
regions as possible.
+ */
+public class IoTDBAlignByDeviceWithTemplate2IT extends
IoTDBAlignByDeviceWithTemplateIT {
+
+ @BeforeClass
+ public static void setUp() throws Exception {
+ EnvFactory.getEnv().getConfig().getCommonConfig().setSeriesSlotNum(1);
+ EnvFactory.getEnv().initClusterEnvironment();
+ insertData();
+ }
+
+ @AfterClass
+ public static void tearDown() throws Exception {
+ EnvFactory.getEnv().cleanClusterEnvironment();
+ }
+}
diff --git
a/integration-test/src/test/java/org/apache/iotdb/db/it/alignbydevice/IoTDBAlignByDeviceWithTemplateIT.java
b/integration-test/src/test/java/org/apache/iotdb/db/it/alignbydevice/IoTDBAlignByDeviceWithTemplateIT.java
index b87f5df60f3..05e2104b904 100644
---
a/integration-test/src/test/java/org/apache/iotdb/db/it/alignbydevice/IoTDBAlignByDeviceWithTemplateIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/db/it/alignbydevice/IoTDBAlignByDeviceWithTemplateIT.java
@@ -957,7 +957,7 @@ public class IoTDBAlignByDeviceWithTemplateIT {
retArray);
}
- private static void insertData() {
+ protected static void insertData() {
try (Connection connection = EnvFactory.getEnv().getConnection();
Statement statement = connection.createStatement()) {
diff --git
a/integration-test/src/test/java/org/apache/iotdb/db/it/alignbydevice/IoTDBOrderByLimitOffsetAlignByDevice2IT.java
b/integration-test/src/test/java/org/apache/iotdb/db/it/alignbydevice/IoTDBOrderByLimitOffsetAlignByDevice2IT.java
new file mode 100644
index 00000000000..85713e87e5d
--- /dev/null
+++
b/integration-test/src/test/java/org/apache/iotdb/db/it/alignbydevice/IoTDBOrderByLimitOffsetAlignByDevice2IT.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.it.alignbydevice;
+
+import org.apache.iotdb.it.env.EnvFactory;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+
+import static
org.apache.iotdb.db.it.alignbydevice.IoTDBOrderByWithAlignByDeviceIT.insertData;
+import static
org.apache.iotdb.db.it.alignbydevice.IoTDBOrderByWithAlignByDeviceIT.insertData2;
+
+public class IoTDBOrderByLimitOffsetAlignByDevice2IT
+ extends IoTDBOrderByLimitOffsetAlignByDeviceIT {
+
+ @BeforeClass
+ public static void setUp() throws Exception {
+ EnvFactory.getEnv().getConfig().getCommonConfig().setSeriesSlotNum(1);
+ EnvFactory.getEnv().initClusterEnvironment();
+ insertData();
+ insertData2();
+ insertData3();
+ }
+
+ @AfterClass
+ public static void tearDown() throws Exception {
+ EnvFactory.getEnv().cleanClusterEnvironment();
+ }
+}
diff --git
a/integration-test/src/test/java/org/apache/iotdb/db/it/alignbydevice/IoTDBOrderByWithAlignByDevice3IT.java
b/integration-test/src/test/java/org/apache/iotdb/db/it/alignbydevice/IoTDBOrderByWithAlignByDevice3IT.java
new file mode 100644
index 00000000000..011972ea471
--- /dev/null
+++
b/integration-test/src/test/java/org/apache/iotdb/db/it/alignbydevice/IoTDBOrderByWithAlignByDevice3IT.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.it.alignbydevice;
+
+import org.apache.iotdb.it.env.EnvFactory;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+
+public class IoTDBOrderByWithAlignByDevice3IT extends
IoTDBOrderByWithAlignByDeviceIT {
+
+ @BeforeClass
+ public static void setUp() throws Exception {
+ EnvFactory.getEnv().getConfig().getCommonConfig().setSeriesSlotNum(1);
+ EnvFactory.getEnv().initClusterEnvironment();
+ insertData();
+ }
+
+ @AfterClass
+ public static void tearDown() throws Exception {
+ EnvFactory.getEnv().cleanClusterEnvironment();
+ }
+}
diff --git
a/integration-test/src/test/java/org/apache/iotdb/db/it/alignbydevice/IoTDBShuffleSink1IT.java
b/integration-test/src/test/java/org/apache/iotdb/db/it/alignbydevice/IoTDBShuffleSink1IT.java
index 73cc3c71dee..484fd5d9d94 100644
---
a/integration-test/src/test/java/org/apache/iotdb/db/it/alignbydevice/IoTDBShuffleSink1IT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/db/it/alignbydevice/IoTDBShuffleSink1IT.java
@@ -56,6 +56,7 @@ public class IoTDBShuffleSink1IT {
@BeforeClass
public static void setUp() throws Exception {
+ EnvFactory.getEnv().getConfig().getCommonConfig().setSeriesSlotNum(1);
EnvFactory.getEnv().getConfig().getCommonConfig().setDataRegionGroupExtensionPolicy("CUSTOM");
EnvFactory.getEnv().getConfig().getCommonConfig().setDefaultDataRegionGroupNumPerDatabase(2);
EnvFactory.getEnv().initClusterEnvironment();
diff --git
a/integration-test/src/test/java/org/apache/iotdb/db/it/alignbydevice/IoTDBShuffleSink2IT.java
b/integration-test/src/test/java/org/apache/iotdb/db/it/alignbydevice/IoTDBShuffleSink2IT.java
index 8558e1f6442..cd29526e956 100644
---
a/integration-test/src/test/java/org/apache/iotdb/db/it/alignbydevice/IoTDBShuffleSink2IT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/db/it/alignbydevice/IoTDBShuffleSink2IT.java
@@ -72,6 +72,7 @@ public class IoTDBShuffleSink2IT {
@BeforeClass
public static void setUp() throws Exception {
+ EnvFactory.getEnv().getConfig().getCommonConfig().setSeriesSlotNum(1);
EnvFactory.getEnv().getConfig().getCommonConfig().setDataRegionGroupExtensionPolicy("CUSTOM");
EnvFactory.getEnv().getConfig().getCommonConfig().setDefaultDataRegionGroupNumPerDatabase(3);
EnvFactory.getEnv().initClusterEnvironment();
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/Accumulator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/Accumulator.java
index 75091443fd0..2065f899d9b 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/Accumulator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/Accumulator.java
@@ -88,4 +88,13 @@ public interface Accumulator {
TSDataType[] getIntermediateType();
TSDataType getFinalType();
+
+ /**
+ * The return value equals to the length of tsBlockBuilder in {@link
+ * #outputIntermediate(ColumnBuilder[])}}. Currently only aggregation `Avg,
FirstValue, LastValue,
+ * TimeDuration` will return 2.
+ */
+ default int getPartialResultSize() {
+ return 1;
+ }
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/AvgAccumulator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/AvgAccumulator.java
index 53f6d50ed3d..7d8604da5b0 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/AvgAccumulator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/AvgAccumulator.java
@@ -163,6 +163,11 @@ public class AvgAccumulator implements Accumulator {
return TSDataType.DOUBLE;
}
+ @Override
+ public int getPartialResultSize() {
+ return 2;
+ }
+
private void addIntInput(Column[] column, BitMap bitMap, int lastIndex) {
for (int i = 0; i <= lastIndex; i++) {
if (bitMap != null && !bitMap.isMarked(i)) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/FirstValueAccumulator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/FirstValueAccumulator.java
index 2cfffa26d08..04f186b96fa 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/FirstValueAccumulator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/FirstValueAccumulator.java
@@ -252,6 +252,11 @@ public class FirstValueAccumulator implements Accumulator {
return firstValue.getDataType();
}
+ @Override
+ public int getPartialResultSize() {
+ return 2;
+ }
+
protected void addIntInput(Column[] column, BitMap bitMap, int lastIndex) {
for (int i = 0; i <= lastIndex; i++) {
if (bitMap != null && !bitMap.isMarked(i)) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/LastValueAccumulator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/LastValueAccumulator.java
index b863812c444..c1ffa9fa6b8 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/LastValueAccumulator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/LastValueAccumulator.java
@@ -252,6 +252,11 @@ public class LastValueAccumulator implements Accumulator {
return lastValue.getDataType();
}
+ @Override
+ public int getPartialResultSize() {
+ return 2;
+ }
+
protected void addIntInput(Column[] column, BitMap bitMap, int lastIndex) {
for (int i = 0; i <= lastIndex; i++) {
if (bitMap != null && !bitMap.isMarked(i)) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/TimeDurationAccumulator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/TimeDurationAccumulator.java
index 84cd5a464ae..6a0045b3f8d 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/TimeDurationAccumulator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/TimeDurationAccumulator.java
@@ -112,6 +112,11 @@ public class TimeDurationAccumulator implements
Accumulator {
return TSDataType.INT64;
}
+ @Override
+ public int getPartialResultSize() {
+ return 2;
+ }
+
protected void updateMaxTime(long curTime) {
initResult = true;
maxTime = Math.max(maxTime, curTime);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/AggregationMergeSortOperator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/AggregationMergeSortOperator.java
new file mode 100644
index 00000000000..52d4a6d93d2
--- /dev/null
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/AggregationMergeSortOperator.java
@@ -0,0 +1,286 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.execution.operator.process;
+
+import org.apache.iotdb.db.queryengine.execution.aggregation.Accumulator;
+import org.apache.iotdb.db.queryengine.execution.operator.Operator;
+import org.apache.iotdb.db.queryengine.execution.operator.OperatorContext;
+import org.apache.iotdb.db.utils.datastructure.MergeSortHeap;
+import org.apache.iotdb.db.utils.datastructure.MergeSortKey;
+import org.apache.iotdb.db.utils.datastructure.SortKey;
+import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder;
+import org.apache.iotdb.tsfile.read.common.block.column.Column;
+import org.apache.iotdb.tsfile.read.common.block.column.ColumnBuilder;
+import org.apache.iotdb.tsfile.read.common.block.column.TimeColumnBuilder;
+import org.apache.iotdb.tsfile.utils.Binary;
+
+import com.google.common.util.concurrent.ListenableFuture;
+
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import static com.google.common.util.concurrent.Futures.successfulAsList;
+
+public class AggregationMergeSortOperator extends AbstractConsumeAllOperator {
+
+ private final List<Accumulator> accumulators;
+
+ private final List<TSDataType> dataTypes;
+
+ private final TsBlockBuilder tsBlockBuilder;
+
+ private final boolean[] noMoreTsBlocks;
+
+ private final MergeSortHeap mergeSortHeap;
+
+ private final boolean hasGroupBy;
+
+ private boolean finished;
+
+ private Binary lastDevice;
+
+ private long lastTime;
+
+ public AggregationMergeSortOperator(
+ OperatorContext operatorContext,
+ List<Operator> children,
+ List<TSDataType> dataTypes,
+ List<Accumulator> accumulators,
+ boolean hasGroupBy,
+ Comparator<SortKey> comparator) {
+ super(operatorContext, children);
+ this.dataTypes = dataTypes;
+ this.tsBlockBuilder = new TsBlockBuilder(dataTypes);
+ this.noMoreTsBlocks = new boolean[this.inputOperatorsCount];
+ this.accumulators = accumulators;
+ this.hasGroupBy = hasGroupBy;
+ this.mergeSortHeap = new MergeSortHeap(inputOperatorsCount, comparator);
+ }
+
+ @Override
+ public TsBlock next() throws Exception {
+ long startTime = System.nanoTime();
+ long maxRuntime =
operatorContext.getMaxRunTime().roundTo(TimeUnit.NANOSECONDS);
+
+ // init all element in inputTsBlocks
+ if (!prepareInput()) {
+ return null;
+ }
+
+ tsBlockBuilder.reset();
+ while (!mergeSortHeap.isEmpty()) {
+ MergeSortKey mergeSortKey = mergeSortHeap.poll();
+ TsBlock targetBlock = mergeSortKey.tsBlock;
+ int rowIndex = mergeSortKey.rowIndex;
+ Binary currentDevice = targetBlock.getColumn(0).getBinary(rowIndex);
+ long currentTime = targetBlock.getTimeByIndex(rowIndex);
+ if (lastDevice != null && (!currentDevice.equals(lastDevice) ||
currentTime != lastTime)) {
+ outputResultToTsBlock();
+ }
+
+ lastDevice = currentDevice;
+ lastTime = currentTime;
+
+ int cnt = 1;
+ for (Accumulator accumulator : accumulators) {
+ if (accumulator.getPartialResultSize() == 2) {
+ Column first =
+ hasGroupBy
+ ? targetBlock.getColumn(cnt++).subColumn(rowIndex)
+ : targetBlock.getColumn(cnt++);
+ Column second =
+ hasGroupBy
+ ? targetBlock.getColumn(cnt++).subColumn(rowIndex)
+ : targetBlock.getColumn(cnt++);
+ accumulator.addIntermediate(new Column[] {first, second});
+ } else {
+ Column column =
+ hasGroupBy
+ ? targetBlock.getColumn(cnt++).subColumn(rowIndex)
+ : targetBlock.getColumn(cnt++);
+ accumulator.addIntermediate(new Column[] {column});
+ }
+ }
+
+ if (mergeSortKey.rowIndex == mergeSortKey.tsBlock.getPositionCount() -
1) {
+ inputTsBlocks[mergeSortKey.inputChannelIndex] = null;
+ break;
+ } else {
+ mergeSortKey.rowIndex++;
+ mergeSortHeap.push(mergeSortKey);
+ }
+
+ // break if time is out or tsBlockBuilder is full
+ if (System.nanoTime() - startTime > maxRuntime ||
tsBlockBuilder.isFull()) {
+ break;
+ }
+ }
+
+ if (mergeSortHeap.isEmpty()) {
+ outputResultToTsBlock();
+ }
+
+ return tsBlockBuilder.build();
+ }
+
+ private void outputResultToTsBlock() {
+ TimeColumnBuilder timeBuilder = tsBlockBuilder.getTimeColumnBuilder();
+ ColumnBuilder[] valueColumnBuilders =
tsBlockBuilder.getValueColumnBuilders();
+ timeBuilder.writeLong(lastTime);
+ valueColumnBuilders[0].writeBinary(lastDevice);
+ for (int i = 1; i < dataTypes.size(); i++) {
+ accumulators.get(i - 1).outputFinal(valueColumnBuilders[i]);
+ }
+ tsBlockBuilder.declarePosition();
+ accumulators.forEach(Accumulator::reset);
+ }
+
+ @Override
+ public boolean hasNext() throws Exception {
+ if (finished) {
+ return false;
+ }
+
+ for (int i = 0; i < inputOperatorsCount; i++) {
+ if (isInputNotEmpty(i)) {
+ return true;
+ } else if (!noMoreTsBlocks[i]) {
+ if (!canCallNext[i] || children.get(i).hasNextWithTimer()) {
+ return true;
+ } else {
+ handleFinishedChild(i);
+ }
+ }
+ }
+
+ return false;
+ }
+
+ @Override
+ public ListenableFuture<?> isBlocked() {
+ boolean hasReadyChild = false;
+ List<ListenableFuture<?>> listenableFutures = new ArrayList<>();
+
+ for (int i = 0; i < inputOperatorsCount; i++) {
+ if (noMoreTsBlocks[i] || isInputNotEmpty(i) || children.get(i) == null) {
+ continue;
+ }
+ ListenableFuture<?> blocked = children.get(i).isBlocked();
+ if (blocked.isDone()) {
+ hasReadyChild = true;
+ // only when not blocked, canCallNext[i] equals true
+ canCallNext[i] = true;
+ } else {
+ listenableFutures.add(blocked);
+ }
+ }
+
+ return (hasReadyChild || listenableFutures.isEmpty())
+ ? NOT_BLOCKED
+ : successfulAsList(listenableFutures);
+ }
+
+ @Override
+ public boolean isFinished() throws Exception {
+ if (finished) {
+ return true;
+ }
+
+ finished = true;
+ for (int i = 0; i < inputOperatorsCount; i++) {
+ if (!noMoreTsBlocks[i] || isInputNotEmpty(i)) {
+ finished = false;
+ break;
+ }
+ }
+ return finished;
+ }
+
+ @Override
+ public void close() throws Exception {
+ for (int i = 0; i < inputOperatorsCount; i++) {
+ final Operator operator = children.get(i);
+ if (operator != null) {
+ operator.close();
+ }
+ }
+ }
+
+ @Override
+ protected void handleFinishedChild(int currentChildIndex) throws Exception {
+ // invoking this method when children.get(currentChildIndex).hasNext
return false
+ noMoreTsBlocks[currentChildIndex] = true;
+ inputTsBlocks[currentChildIndex] = null;
+ children.get(currentChildIndex).close();
+ children.set(currentChildIndex, null);
+ }
+
+ @Override
+ protected boolean canSkipCurrentChild(int currentChildIndex) {
+ return noMoreTsBlocks[currentChildIndex]
+ || !isEmpty(currentChildIndex)
+ || children.get(currentChildIndex) == null;
+ }
+
+ @Override
+ protected void processCurrentInputTsBlock(int currentInputIndex) {
+ mergeSortHeap.push(new MergeSortKey(inputTsBlocks[currentInputIndex], 0,
currentInputIndex));
+ }
+
+ @Override
+ public long calculateMaxPeekMemory() {
+ long maxPeekMemory =
TSFileDescriptor.getInstance().getConfig().getPageSizeInByte();
+ // inputTsBlocks will cache all the tsBlocks returned by inputOperators
+ for (Operator operator : children) {
+ maxPeekMemory += operator.calculateMaxReturnSize();
+ maxPeekMemory += operator.calculateRetainedSizeAfterCallingNext();
+ }
+ for (Operator operator : children) {
+ maxPeekMemory = Math.max(maxPeekMemory,
operator.calculateMaxPeekMemory());
+ }
+ return Math.max(maxPeekMemory, calculateMaxReturnSize());
+ }
+
+ @Override
+ public long calculateMaxReturnSize() {
+ return
TSFileDescriptor.getInstance().getConfig().getMaxTsBlockSizeInBytes();
+ }
+
+ @Override
+ public long calculateRetainedSizeAfterCallingNext() {
+ long currentRetainedSize = 0;
+ long minChildReturnSize = Long.MAX_VALUE;
+ for (Operator child : children) {
+ long maxReturnSize = child.calculateMaxReturnSize();
+ minChildReturnSize = Math.min(minChildReturnSize, maxReturnSize);
+ currentRetainedSize += (maxReturnSize +
child.calculateRetainedSizeAfterCallingNext());
+ }
+ return currentRetainedSize - minChildReturnSize;
+ }
+
+ private boolean isInputNotEmpty(int index) {
+ return inputTsBlocks[index] != null && !inputTsBlocks[index].isEmpty();
+ }
+}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/Analysis.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/Analysis.java
index 3014cdec431..81c27d885f4 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/Analysis.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/Analysis.java
@@ -174,8 +174,6 @@ public class Analysis {
// deviceViewSpecialProcess equals true when all Aggregation Functions and
DIFF
private boolean deviceViewSpecialProcess;
- private boolean existDeviceCrossRegion;
-
/////////////////////////////////////////////////////////////////////////////////////////////////
// Query Common Analysis (above DeviceView)
/////////////////////////////////////////////////////////////////////////////////////////////////
@@ -658,14 +656,6 @@ public class Analysis {
this.deviceViewSpecialProcess = deviceViewSpecialProcess;
}
- public boolean isExistDeviceCrossRegion() {
- return existDeviceCrossRegion;
- }
-
- public void setExistDeviceCrossRegion() {
- this.existDeviceCrossRegion = true;
- }
-
public DeviceViewIntoPathDescriptor getDeviceViewIntoPathDescriptor() {
return deviceViewIntoPathDescriptor;
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/OperatorTreeGenerator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/OperatorTreeGenerator.java
index cbf8db730de..4ce99aa3737 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/OperatorTreeGenerator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/OperatorTreeGenerator.java
@@ -19,6 +19,7 @@
package org.apache.iotdb.db.queryengine.plan.planner;
+import org.apache.iotdb.common.rpc.thrift.TAggregationType;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.commons.path.AlignedPath;
import org.apache.iotdb.commons.path.MeasurementPath;
@@ -27,6 +28,7 @@ import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.queryengine.common.FragmentInstanceId;
import org.apache.iotdb.db.queryengine.common.NodeRef;
+import org.apache.iotdb.db.queryengine.execution.aggregation.Accumulator;
import
org.apache.iotdb.db.queryengine.execution.aggregation.AccumulatorFactory;
import org.apache.iotdb.db.queryengine.execution.aggregation.Aggregator;
import
org.apache.iotdb.db.queryengine.execution.aggregation.slidingwindow.SlidingWindowAggregatorFactory;
@@ -45,6 +47,7 @@ import
org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceManage
import org.apache.iotdb.db.queryengine.execution.operator.AggregationUtil;
import org.apache.iotdb.db.queryengine.execution.operator.Operator;
import org.apache.iotdb.db.queryengine.execution.operator.OperatorContext;
+import
org.apache.iotdb.db.queryengine.execution.operator.process.AggregationMergeSortOperator;
import
org.apache.iotdb.db.queryengine.execution.operator.process.AggregationOperator;
import
org.apache.iotdb.db.queryengine.execution.operator.process.ColumnInjectOperator;
import
org.apache.iotdb.db.queryengine.execution.operator.process.DeviceViewIntoOperator;
@@ -96,7 +99,6 @@ import
org.apache.iotdb.db.queryengine.execution.operator.process.join.LeftOuter
import
org.apache.iotdb.db.queryengine.execution.operator.process.join.merge.AscTimeComparator;
import
org.apache.iotdb.db.queryengine.execution.operator.process.join.merge.ColumnMerger;
import
org.apache.iotdb.db.queryengine.execution.operator.process.join.merge.DescTimeComparator;
-import
org.apache.iotdb.db.queryengine.execution.operator.process.join.merge.MergeSortComparator;
import
org.apache.iotdb.db.queryengine.execution.operator.process.join.merge.MultiColumnMerger;
import
org.apache.iotdb.db.queryengine.execution.operator.process.join.merge.NonOverlappedMultiColumnMerger;
import
org.apache.iotdb.db.queryengine.execution.operator.process.join.merge.SingleColumnMerger;
@@ -146,6 +148,7 @@ import
org.apache.iotdb.db.queryengine.plan.analyze.cache.schema.DataNodeSchemaC
import org.apache.iotdb.db.queryengine.plan.expression.Expression;
import org.apache.iotdb.db.queryengine.plan.expression.leaf.TimeSeriesOperand;
import org.apache.iotdb.db.queryengine.plan.expression.leaf.TimestampOperand;
+import
org.apache.iotdb.db.queryengine.plan.expression.multi.FunctionExpression;
import
org.apache.iotdb.db.queryengine.plan.expression.visitor.ColumnTransformerVisitor;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanVisitor;
@@ -166,6 +169,7 @@ import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.read.Sche
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.read.SchemaQueryScanNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.read.TimeSeriesCountNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.read.TimeSeriesSchemaScanNode;
+import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.AggregationMergeSortNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.AggregationNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.ColumnInjectNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.DeviceViewIntoNode;
@@ -273,6 +277,7 @@ import static
org.apache.iotdb.db.queryengine.execution.operator.AggregationUtil
import static
org.apache.iotdb.db.queryengine.execution.operator.AggregationUtil.calculateMaxAggregationResultSizeForLastQuery;
import static
org.apache.iotdb.db.queryengine.execution.operator.AggregationUtil.getOutputColumnSizePerLine;
import static
org.apache.iotdb.db.queryengine.execution.operator.AggregationUtil.initTimeRangeIterator;
+import static
org.apache.iotdb.db.queryengine.execution.operator.process.join.merge.MergeSortComparator.getComparator;
import static
org.apache.iotdb.db.queryengine.plan.expression.leaf.TimestampOperand.TIMESTAMP_EXPRESSION_STRING;
import static
org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.SeriesScanOptions.updateFilterUsingTTL;
import static
org.apache.iotdb.db.utils.TimestampPrecisionUtils.TIMESTAMP_PRECISION;
@@ -837,7 +842,68 @@ public class OperatorTreeGenerator extends
PlanVisitor<Operator, LocalExecutionP
operatorContext,
children,
dataTypes,
- MergeSortComparator.getComparator(sortItemList, sortItemIndexList,
sortItemDataTypeList));
+ getComparator(sortItemList, sortItemIndexList, sortItemDataTypeList));
+ }
+
+ @Override
+ public Operator visitAggregationMergeSort(
+ AggregationMergeSortNode node, LocalExecutionPlanContext context) {
+ OperatorContext operatorContext =
+ context
+ .getDriverContext()
+ .addOperatorContext(
+ context.getNextOperatorId(),
+ node.getPlanNodeId(),
+ MergeSortOperator.class.getSimpleName());
+ List<TSDataType> dataTypes = getOutputColumnTypes(node,
context.getTypeProvider());
+ List<Operator> children = dealWithConsumeAllChildrenPipelineBreaker(node,
context);
+
+ List<SortItem> sortItemList =
node.getMergeOrderParameter().getSortItemList();
+ if (!sortItemList.get(0).getSortKey().equalsIgnoreCase("Device")) {
+ throw new IllegalArgumentException(
+ "Only order by device align by device support
AggregationMergeSortNode.");
+ }
+
+ boolean timeAscending = true;
+ for (SortItem sortItem : sortItemList) {
+ if (TIMESTAMP_EXPRESSION_STRING.equalsIgnoreCase(sortItem.getSortKey())
+ && (sortItem.getOrdering() == Ordering.DESC)) {
+ timeAscending = false;
+ break;
+ }
+ }
+
+ List<Accumulator> accumulators = new ArrayList<>();
+ for (Expression expression : node.getSelectExpressions()) {
+ if (expression instanceof FunctionExpression) {
+ FunctionExpression functionExpression = (FunctionExpression)
expression;
+ String aggregationName = functionExpression.getFunctionName();
+ Accumulator accumulator =
+ AccumulatorFactory.createAccumulator(
+ TAggregationType.valueOf(aggregationName.toUpperCase()),
+
context.getTypeProvider().getType(functionExpression.getOutputSymbol()),
+ functionExpression.getExpressions(),
+ functionExpression.getFunctionAttributes(),
+ timeAscending);
+ accumulators.add(accumulator);
+ }
+ }
+
+ List<Integer> sortItemIndexList = new ArrayList<>(sortItemList.size());
+ List<TSDataType> sortItemDataTypeList = new
ArrayList<>(sortItemList.size());
+ genSortInformation(
+ node.getOutputColumnNames(),
+ dataTypes,
+ sortItemList,
+ sortItemIndexList,
+ sortItemDataTypeList);
+ return new AggregationMergeSortOperator(
+ operatorContext,
+ children,
+ dataTypes,
+ accumulators,
+ node.isHasGroupBy(),
+ getComparator(sortItemList, sortItemIndexList, sortItemDataTypeList));
}
@Override
@@ -866,7 +932,7 @@ public class OperatorTreeGenerator extends
PlanVisitor<Operator, LocalExecutionP
operatorContext,
children,
dataTypes,
- MergeSortComparator.getComparator(sortItemList, sortItemIndexList,
sortItemDataTypeList),
+ getComparator(sortItemList, sortItemIndexList, sortItemDataTypeList),
node.getTopValue(),
!sortItemList.isEmpty()
&&
sortItemList.get(0).getSortKey().equalsIgnoreCase(OrderByKey.TIME)
@@ -1872,7 +1938,7 @@ public class OperatorTreeGenerator extends
PlanVisitor<Operator, LocalExecutionP
child,
dataTypes,
filePrefix,
- MergeSortComparator.getComparator(sortItemList, sortItemIndexList,
sortItemDataTypeList));
+ getComparator(sortItemList, sortItemIndexList, sortItemDataTypeList));
}
@Override
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/SourceRewriter.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/SourceRewriter.java
index 450d37809c3..69066ec51ae 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/SourceRewriter.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/SourceRewriter.java
@@ -29,7 +29,7 @@ import org.apache.iotdb.commons.utils.TimePartitionUtils;
import org.apache.iotdb.db.queryengine.common.MPPQueryContext;
import org.apache.iotdb.db.queryengine.plan.analyze.Analysis;
import org.apache.iotdb.db.queryengine.plan.expression.Expression;
-import org.apache.iotdb.db.queryengine.plan.planner.LogicalPlanBuilder;
+import
org.apache.iotdb.db.queryengine.plan.expression.multi.FunctionExpression;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.BaseSourceRewriter;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId;
@@ -72,8 +72,11 @@ import
org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.OrderByParame
import org.apache.iotdb.db.queryengine.plan.statement.component.OrderByKey;
import org.apache.iotdb.db.queryengine.plan.statement.component.Ordering;
import org.apache.iotdb.db.queryengine.plan.statement.component.SortItem;
+import org.apache.iotdb.db.utils.constant.SqlConstant;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
@@ -84,8 +87,16 @@ import java.util.Set;
import java.util.TreeSet;
import java.util.stream.Collectors;
+import static org.apache.iotdb.commons.conf.IoTDBConstant.LAST_VALUE;
import static
org.apache.iotdb.commons.conf.IoTDBConstant.MULTI_LEVEL_PATH_WILDCARD;
import static org.apache.iotdb.commons.partition.DataPartition.NOT_ASSIGNED;
+import static
org.apache.iotdb.db.queryengine.plan.analyze.ExpressionTypeAnalyzer.analyzeExpression;
+import static
org.apache.iotdb.db.queryengine.plan.planner.LogicalPlanBuilder.updateTypeProviderByPartialAggregation;
+import static org.apache.iotdb.db.utils.constant.SqlConstant.AVG;
+import static org.apache.iotdb.db.utils.constant.SqlConstant.COUNT_IF;
+import static org.apache.iotdb.db.utils.constant.SqlConstant.DIFF;
+import static org.apache.iotdb.db.utils.constant.SqlConstant.FIRST_VALUE;
+import static org.apache.iotdb.db.utils.constant.SqlConstant.TIME_DURATION;
public class SourceRewriter extends
BaseSourceRewriter<DistributionPlanContext> {
@@ -123,7 +134,6 @@ public class SourceRewriter extends
BaseSourceRewriter<DistributionPlanContext>
public List<PlanNode> visitSingleDeviceView(
SingleDeviceViewNode node, DistributionPlanContext context) {
- // Same process logic as visitDeviceView
if (analysis.isDeviceViewSpecialProcess()) {
List<PlanNode> rewroteChildren = rewrite(node.getChild(), context);
if (rewroteChildren.size() != 1) {
@@ -175,6 +185,7 @@ public class SourceRewriter extends
BaseSourceRewriter<DistributionPlanContext>
// Step 1: constructs DeviceViewSplits
Set<TRegionReplicaSet> relatedDataRegions = new HashSet<>();
List<DeviceViewSplit> deviceViewSplits = new ArrayList<>();
+ boolean existDeviceCrossRegion = false;
for (int i = 0; i < node.getDevices().size(); i++) {
String outputDevice = node.getDevices().get(i);
@@ -187,10 +198,9 @@ public class SourceRewriter extends
BaseSourceRewriter<DistributionPlanContext>
context.getPartitionTimeFilter()))
: new ArrayList<>(
analysis.getPartitionInfo(outputDevice,
context.getPartitionTimeFilter()));
- if (regionReplicaSets.size() > 1) {
- // specialProcess and existDeviceCrossRegion, use the old aggregation
logic
- analysis.setExistDeviceCrossRegion();
- if (analysis.isDeviceViewSpecialProcess()) {
+ if (regionReplicaSets.size() > 1 && !existDeviceCrossRegion) {
+ existDeviceCrossRegion = true;
+ if (analysis.isDeviceViewSpecialProcess() &&
aggregationCannotUseMergeSort()) {
return processSpecialDeviceView(node, context);
}
}
@@ -200,7 +210,7 @@ public class SourceRewriter extends
BaseSourceRewriter<DistributionPlanContext>
// Step 2: Iterate all partition and create DeviceViewNode for each region
List<PlanNode> deviceViewNodeList = new ArrayList<>();
- if (analysis.isExistDeviceCrossRegion()) {
+ if (existDeviceCrossRegion) {
constructDeviceViewNodeListWithCrossRegion(
deviceViewNodeList, relatedDataRegions, deviceViewSplits, node,
context);
} else {
@@ -217,13 +227,103 @@ public class SourceRewriter extends
BaseSourceRewriter<DistributionPlanContext>
return deviceViewNodeList;
}
- MergeSortNode mergeSortNode =
- new MergeSortNode(
- context.queryContext.getQueryId().genPlanNodeId(),
- node.getMergeOrderParameter(),
- node.getOutputColumnNames());
- deviceViewNodeList.forEach(mergeSortNode::addChild);
- return Collections.singletonList(mergeSortNode);
+ // aggregation and some device cross region, user AggregationMergeSortNode
+ // 1. generate old and new measurement idx relationship
+ // 2. generate new outputColumns for each subDeviceView
+ if (existDeviceCrossRegion && analysis.isDeviceViewSpecialProcess()) {
+ Map<Integer, List<Integer>> newMeasurementIdxMap = new HashMap<>();
+ List<String> newPartialOutputColumns = new ArrayList<>();
+ Set<Expression> deviceViewOutputExpressions =
analysis.getDeviceViewOutputExpressions();
+
+ int i = 0, newIdxSum = 0;
+ for (Expression expression : deviceViewOutputExpressions) {
+ if (i == 0) {
+ newPartialOutputColumns.add(expression.getOutputSymbol());
+ i++;
+ newIdxSum++;
+ continue;
+ }
+ FunctionExpression aggExpression = (FunctionExpression) expression;
+ // used for AVG, FIRST_VALUE, LAST_VALUE, TIME_DURATION agg function
+ List<String> actualPartialAggregationNames =
+ getActualPartialAggregationNames(aggExpression.getFunctionName());
+ for (String actualAggName : actualPartialAggregationNames) {
+ FunctionExpression partialFunctionExpression =
+ new FunctionExpression(
+ actualAggName,
+ aggExpression.getFunctionAttributes(),
+ aggExpression.getExpressions());
+ if (actualPartialAggregationNames.size() > 1) {
+ TSDataType dataType = analyzeExpression(analysis,
partialFunctionExpression);
+ context
+ .queryContext
+ .getTypeProvider()
+ .setType(partialFunctionExpression.getOutputSymbol(),
dataType);
+ }
+
newPartialOutputColumns.add(partialFunctionExpression.getOutputSymbol());
+ }
+ newMeasurementIdxMap.put(
+ i++,
+ actualPartialAggregationNames.size() > 1
+ ? Arrays.asList(newIdxSum++, newIdxSum++)
+ : Collections.singletonList(newIdxSum++));
+ }
+
+ for (String device : node.getDevices()) {
+ List<Integer> oldMeasurementIdxList =
node.getDeviceToMeasurementIndexesMap().get(device);
+ List<Integer> newMeasurementIdxList = new ArrayList<>();
+ oldMeasurementIdxList.forEach(
+ idx ->
newMeasurementIdxList.addAll(newMeasurementIdxMap.get(idx)));
+ node.getDeviceToMeasurementIndexesMap().put(device,
newMeasurementIdxList);
+ }
+
+ for (PlanNode planNode : deviceViewNodeList) {
+ DeviceViewNode deviceViewNode = (DeviceViewNode) planNode;
+ deviceViewNode.setOutputColumnNames(newPartialOutputColumns);
+ transferAggregatorsRecursively(planNode, context);
+ }
+
+ boolean hasGroupBy =
+ analysis.getGroupByTimeParameter() != null ||
analysis.hasGroupByParameter();
+ AggregationMergeSortNode mergeSortNode =
+ new AggregationMergeSortNode(
+ context.queryContext.getQueryId().genPlanNodeId(),
+ node.getMergeOrderParameter(),
+ node.getOutputColumnNames(),
+ deviceViewOutputExpressions,
+ hasGroupBy);
+ deviceViewNodeList.forEach(mergeSortNode::addChild);
+ return Collections.singletonList(mergeSortNode);
+ } else {
+ MergeSortNode mergeSortNode =
+ new MergeSortNode(
+ context.queryContext.getQueryId().genPlanNodeId(),
+ node.getMergeOrderParameter(),
+ node.getOutputColumnNames());
+ deviceViewNodeList.forEach(mergeSortNode::addChild);
+ return Collections.singletonList(mergeSortNode);
+ }
+ }
+
+ /**
+ * aggregation align by device, and aggregation is `count_if` or `diff`, or
aggregation used with
+ * group by parameter (session, variation, count), use the old aggregation
logic
+ */
+ private boolean aggregationCannotUseMergeSort() {
+ if (analysis.hasGroupByParameter()) {
+ return true;
+ }
+
+ for (Expression expression : analysis.getDeviceViewOutputExpressions()) {
+ if (expression instanceof FunctionExpression) {
+ String functionName = ((FunctionExpression)
expression).getFunctionName();
+ if (COUNT_IF.equalsIgnoreCase(functionName) ||
DIFF.equalsIgnoreCase(functionName)) {
+ return true;
+ }
+ }
+ }
+
+ return false;
}
private void constructDeviceViewNodeListWithCrossRegion(
@@ -286,10 +386,56 @@ public class SourceRewriter extends
BaseSourceRewriter<DistributionPlanContext>
}
}
- @Override
- public List<PlanNode> visitAggregationMergeSort(
- AggregationMergeSortNode node, DistributionPlanContext context) {
- return null;
+ public List<String> getActualPartialAggregationNames(String aggregationType)
{
+ List<String> outputAggregationNames = new ArrayList<>();
+ switch (aggregationType) {
+ case AVG:
+ outputAggregationNames.add(SqlConstant.COUNT);
+ outputAggregationNames.add(SqlConstant.SUM);
+ break;
+ case FIRST_VALUE:
+ outputAggregationNames.add(FIRST_VALUE);
+ outputAggregationNames.add(SqlConstant.MIN_TIME);
+ break;
+ case LAST_VALUE:
+ outputAggregationNames.add(SqlConstant.LAST_VALUE);
+ outputAggregationNames.add(SqlConstant.MAX_TIME);
+ break;
+ case TIME_DURATION:
+ outputAggregationNames.add(SqlConstant.MAX_TIME);
+ outputAggregationNames.add(SqlConstant.MIN_TIME);
+ break;
+ default:
+ // TODO how about UDAF?
+ outputAggregationNames.add(aggregationType);
+ }
+ return outputAggregationNames;
+ }
+
+ private void transferAggregatorsRecursively(PlanNode planNode,
DistributionPlanContext context) {
+ List<AggregationDescriptor> descriptorList =
getAggregationDescriptors(planNode);
+ if (descriptorList != null) {
+ for (AggregationDescriptor descriptor : descriptorList) {
+ descriptor.setStep(
+ planNode instanceof SlidingWindowAggregationNode
+ ? AggregationStep.INTERMEDIATE
+ : AggregationStep.PARTIAL);
+ updateTypeProviderByPartialAggregation(descriptor,
context.queryContext.getTypeProvider());
+ }
+ }
+ planNode.getChildren().forEach(child ->
transferAggregatorsRecursively(child, context));
+ }
+
+ private List<AggregationDescriptor> getAggregationDescriptors(PlanNode
planNode) {
+ List<AggregationDescriptor> descriptorList = null;
+ if (planNode instanceof SeriesAggregationSourceNode) {
+ descriptorList = ((SeriesAggregationSourceNode)
planNode).getAggregationDescriptorList();
+ } else if (planNode instanceof AggregationNode) {
+ descriptorList = ((AggregationNode)
planNode).getAggregationDescriptorList();
+ } else if (planNode instanceof SlidingWindowAggregationNode) {
+ descriptorList = ((SlidingWindowAggregationNode)
planNode).getAggregationDescriptorList();
+ }
+ return descriptorList;
}
private List<PlanNode> processSpecialDeviceView(
@@ -621,9 +767,7 @@ public class SourceRewriter extends
BaseSourceRewriter<DistributionPlanContext>
descriptor.getInputExpressions(),
descriptor.getInputAttributes())));
leafAggDescriptorList.forEach(
- d ->
- LogicalPlanBuilder.updateTypeProviderByPartialAggregation(
- d, context.queryContext.getTypeProvider()));
+ d -> updateTypeProviderByPartialAggregation(d,
context.queryContext.getTypeProvider()));
List<AggregationDescriptor> rootAggDescriptorList = new ArrayList<>();
node.getAggregationDescriptorList()
.forEach(
@@ -1307,7 +1451,7 @@ public class SourceRewriter extends
BaseSourceRewriter<DistributionPlanContext>
}
if (keep) {
descriptorList.add(originalDescriptor);
- LogicalPlanBuilder.updateTypeProviderByPartialAggregation(
+ updateTypeProviderByPartialAggregation(
originalDescriptor, context.queryContext.getTypeProvider());
}
}
@@ -1349,8 +1493,7 @@ public class SourceRewriter extends
BaseSourceRewriter<DistributionPlanContext>
descriptor.setStep(level == 0 ? AggregationStep.FINAL :
AggregationStep.INTERMEDIATE);
descriptor.setInputExpressions(new ArrayList<>(descriptorExpressions));
descriptorList.add(descriptor);
- LogicalPlanBuilder.updateTypeProviderByPartialAggregation(
- descriptor, context.queryContext.getTypeProvider());
+ updateTypeProviderByPartialAggregation(descriptor,
context.queryContext.getTypeProvider());
}
handle.setGroupByLevelDescriptors(descriptorList);
}
@@ -1447,8 +1590,7 @@ public class SourceRewriter extends
BaseSourceRewriter<DistributionPlanContext>
.forEach(
d -> {
d.setStep(AggregationStep.PARTIAL);
- LogicalPlanBuilder.updateTypeProviderByPartialAggregation(
- d, context.queryContext.getTypeProvider());
+ updateTypeProviderByPartialAggregation(d,
context.queryContext.getTypeProvider());
});
}
@@ -1486,7 +1628,7 @@ public class SourceRewriter extends
BaseSourceRewriter<DistributionPlanContext>
} else {
eachSeriesOneRegion[0] = false;
descriptor.setStep(AggregationStep.PARTIAL);
- LogicalPlanBuilder.updateTypeProviderByPartialAggregation(
+ updateTypeProviderByPartialAggregation(
descriptor, context.queryContext.getTypeProvider());
}
});
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/process/AggregationMergeSortNode.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/process/AggregationMergeSortNode.java
index 41c4cfb2b55..f4bced5a216 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/process/AggregationMergeSortNode.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/process/AggregationMergeSortNode.java
@@ -19,6 +19,7 @@
package org.apache.iotdb.db.queryengine.plan.planner.plan.node.process;
+import org.apache.iotdb.db.queryengine.plan.expression.Expression;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeType;
@@ -30,8 +31,10 @@ import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
+import java.util.LinkedHashSet;
import java.util.List;
import java.util.Objects;
+import java.util.Set;
public class AggregationMergeSortNode extends MultiChildProcessNode {
@@ -39,30 +42,53 @@ public class AggregationMergeSortNode extends
MultiChildProcessNode {
private final List<String> outputColumns;
+ private final Set<Expression> selectExpressions;
+
+ private boolean hasGroupBy;
+
public AggregationMergeSortNode(
- PlanNodeId id, OrderByParameter mergeOrderParameter, List<String>
outputColumns) {
+ PlanNodeId id,
+ OrderByParameter mergeOrderParameter,
+ List<String> outputColumns,
+ Set<Expression> selectExpressions,
+ boolean hasGroupBy) {
super(id);
this.mergeOrderParameter = mergeOrderParameter;
this.outputColumns = outputColumns;
+ this.selectExpressions = selectExpressions;
+ this.hasGroupBy = hasGroupBy;
}
public AggregationMergeSortNode(
PlanNodeId id,
List<PlanNode> children,
OrderByParameter mergeOrderParameter,
- List<String> outputColumns) {
+ List<String> outputColumns,
+ Set<Expression> selectExpressions,
+ boolean hasGroupBy) {
super(id, children);
this.mergeOrderParameter = mergeOrderParameter;
this.outputColumns = outputColumns;
+ this.selectExpressions = selectExpressions;
+ this.hasGroupBy = hasGroupBy;
}
public OrderByParameter getMergeOrderParameter() {
return mergeOrderParameter;
}
+ public Set<Expression> getSelectExpressions() {
+ return this.selectExpressions;
+ }
+
+ public boolean isHasGroupBy() {
+ return this.hasGroupBy;
+ }
+
@Override
public PlanNode clone() {
- return new AggregationMergeSortNode(getPlanNodeId(),
getMergeOrderParameter(), outputColumns);
+ return new AggregationMergeSortNode(
+ getPlanNodeId(), getMergeOrderParameter(), outputColumns,
selectExpressions, hasGroupBy);
}
@Override
@@ -71,7 +97,9 @@ public class AggregationMergeSortNode extends
MultiChildProcessNode {
new PlanNodeId(String.format("%s-%s", getPlanNodeId(), subNodeId)),
new ArrayList<>(children.subList(startIndex, endIndex)),
getMergeOrderParameter(),
- outputColumns);
+ outputColumns,
+ selectExpressions,
+ hasGroupBy);
}
@Override
@@ -92,6 +120,11 @@ public class AggregationMergeSortNode extends
MultiChildProcessNode {
for (String column : outputColumns) {
ReadWriteIOUtils.write(column, byteBuffer);
}
+ ReadWriteIOUtils.write(selectExpressions.size(), byteBuffer);
+ for (Expression expression : selectExpressions) {
+ Expression.serialize(expression, byteBuffer);
+ }
+ ReadWriteIOUtils.write(hasGroupBy, byteBuffer);
}
@Override
@@ -102,6 +135,11 @@ public class AggregationMergeSortNode extends
MultiChildProcessNode {
for (String column : outputColumns) {
ReadWriteIOUtils.write(column, stream);
}
+ ReadWriteIOUtils.write(selectExpressions.size(), stream);
+ for (Expression expression : selectExpressions) {
+ Expression.serialize(expression, stream);
+ }
+ ReadWriteIOUtils.write(hasGroupBy, stream);
}
public static AggregationMergeSortNode deserialize(ByteBuffer byteBuffer) {
@@ -112,8 +150,16 @@ public class AggregationMergeSortNode extends
MultiChildProcessNode {
outputColumns.add(ReadWriteIOUtils.readString(byteBuffer));
columnSize--;
}
+ Set<Expression> expressions = new LinkedHashSet<>();
+ int expressionSize = ReadWriteIOUtils.readInt(byteBuffer);
+ while (expressionSize > 0) {
+ expressions.add(Expression.deserialize(byteBuffer));
+ expressionSize--;
+ }
+ boolean hasGroupBy = ReadWriteIOUtils.readBool(byteBuffer);
PlanNodeId planNodeId = PlanNodeId.deserialize(byteBuffer);
- return new AggregationMergeSortNode(planNodeId, orderByParameter,
outputColumns);
+ return new AggregationMergeSortNode(
+ planNodeId, orderByParameter, outputColumns, expressions, hasGroupBy);
}
@Override
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/process/DeviceViewNode.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/process/DeviceViewNode.java
index 4d4b9b67049..bc7204a0d40 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/process/DeviceViewNode.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/process/DeviceViewNode.java
@@ -52,7 +52,7 @@ public class DeviceViewNode extends MultiChildProcessNode {
private final List<String> devices = new ArrayList<>();
// Device column and measurement columns in result output
- private final List<String> outputColumnNames;
+ private List<String> outputColumnNames;
// e.g. [s1,s2,s3] is query, but [s1, s3] exists in device1, then device1 ->
[1, 3], s1 is 1 but
// not 0 because device is the first column
@@ -114,6 +114,10 @@ public class DeviceViewNode extends MultiChildProcessNode {
return outputColumnNames;
}
+ public void setOutputColumnNames(List<String> outputColumnNames) {
+ this.outputColumnNames = outputColumnNames;
+ }
+
@Override
public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {
return visitor.visitDeviceView(this, context);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/parameter/AggregationDescriptor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/parameter/AggregationDescriptor.java
index 3b32baef3fa..b662b6f59da 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/parameter/AggregationDescriptor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/parameter/AggregationDescriptor.java
@@ -36,6 +36,8 @@ import java.util.Map;
import java.util.Objects;
import static
org.apache.iotdb.db.queryengine.execution.operator.AggregationUtil.addPartialSuffix;
+import static org.apache.iotdb.db.utils.constant.SqlConstant.FIRST_VALUE;
+import static org.apache.iotdb.db.utils.constant.SqlConstant.STDDEV;
public class AggregationDescriptor {
@@ -127,7 +129,7 @@ public class AggregationDescriptor {
}
/** Keep the lower case of function name for partial result, and origin
value for others. */
- protected List<String> getActualAggregationNames(boolean isPartial) {
+ public List<String> getActualAggregationNames(boolean isPartial) {
List<String> outputAggregationNames = new ArrayList<>();
if (isPartial) {
switch (aggregationType) {
@@ -136,7 +138,7 @@ public class AggregationDescriptor {
outputAggregationNames.add(SqlConstant.SUM);
break;
case FIRST_VALUE:
- outputAggregationNames.add(SqlConstant.FIRST_VALUE);
+ outputAggregationNames.add(FIRST_VALUE);
outputAggregationNames.add(SqlConstant.MIN_TIME);
break;
case LAST_VALUE:
@@ -148,7 +150,7 @@ public class AggregationDescriptor {
outputAggregationNames.add(SqlConstant.MIN_TIME);
break;
case STDDEV:
- outputAggregationNames.add(addPartialSuffix(SqlConstant.STDDEV));
+ outputAggregationNames.add(addPartialSuffix(STDDEV));
break;
case STDDEV_POP:
outputAggregationNames.add(addPartialSuffix(SqlConstant.STDDEV_POP));
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/AggregationDistributionTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/AggregationDistributionTest.java
index b4b9d91e632..baf96bfcac3 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/AggregationDistributionTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/AggregationDistributionTest.java
@@ -34,6 +34,7 @@ import
org.apache.iotdb.db.queryengine.plan.planner.plan.FragmentInstance;
import org.apache.iotdb.db.queryengine.plan.planner.plan.LogicalQueryPlan;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId;
+import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.AggregationMergeSortNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.AggregationNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.DeviceViewNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.ExchangeNode;
@@ -43,7 +44,6 @@ import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.SlidingWin
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.join.FullOuterTimeJoinNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.SeriesAggregationScanNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.SeriesAggregationSourceNode;
-import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.SeriesSourceNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.AggregationDescriptor;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.AggregationStep;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.CrossSeriesAggregationDescriptor;
@@ -779,10 +779,10 @@ public class AggregationDistributionTest {
plan.getInstances().get(0).getFragment().getPlanNodeTree().getChildren().get(0);
PlanNode f2Root =
plan.getInstances().get(1).getFragment().getPlanNodeTree().getChildren().get(0);
- assertTrue(f1Root instanceof DeviceViewNode);
- assertTrue(f2Root instanceof HorizontallyConcatNode);
- assertTrue(f1Root.getChildren().get(0) instanceof AggregationNode);
- assertEquals(3, f1Root.getChildren().get(0).getChildren().size());
+ assertTrue(f1Root instanceof AggregationMergeSortNode);
+ assertTrue(f2Root instanceof DeviceViewNode);
+ assertTrue(f1Root.getChildren().get(0) instanceof DeviceViewNode);
+ assertEquals(1, f1Root.getChildren().get(0).getChildren().size());
}
@Test
@@ -803,13 +803,13 @@ public class AggregationDistributionTest {
plan.getInstances().get(1).getFragment().getPlanNodeTree().getChildren().get(0);
PlanNode f3Root =
plan.getInstances().get(2).getFragment().getPlanNodeTree().getChildren().get(0);
- assertTrue(f1Root instanceof DeviceViewNode);
- assertTrue(f2Root instanceof HorizontallyConcatNode);
- assertTrue(f3Root instanceof HorizontallyConcatNode);
- assertTrue(f3Root.getChildren().get(0) instanceof SeriesSourceNode);
- assertTrue(f1Root.getChildren().get(0) instanceof AggregationNode);
+ assertTrue(f1Root instanceof AggregationMergeSortNode);
+ assertTrue(f2Root instanceof DeviceViewNode);
+ assertTrue(f3Root instanceof DeviceViewNode);
+ assertTrue(f3Root.getChildren().get(0) instanceof FullOuterTimeJoinNode);
+ assertTrue(f1Root.getChildren().get(0) instanceof DeviceViewNode);
assertTrue(f1Root.getChildren().get(1) instanceof ExchangeNode);
- assertEquals(3, f1Root.getChildren().get(0).getChildren().size());
+ assertEquals(1, f1Root.getChildren().get(0).getChildren().size());
}
@Test
@@ -828,8 +828,8 @@ public class AggregationDistributionTest {
plan.getInstances().get(0).getFragment().getPlanNodeTree().getChildren().get(0);
PlanNode f2Root =
plan.getInstances().get(1).getFragment().getPlanNodeTree().getChildren().get(0);
- assertTrue(f1Root instanceof DeviceViewNode);
- assertTrue(f2Root instanceof HorizontallyConcatNode);
+ assertTrue(f1Root instanceof AggregationMergeSortNode);
+ assertTrue(f2Root instanceof DeviceViewNode);
assertEquals(2, f1Root.getChildren().size());
}
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/AlignByDeviceOrderByLimitOffsetTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/AlignByDeviceOrderByLimitOffsetTest.java
index 750a86c4b3a..b32b231e71e 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/AlignByDeviceOrderByLimitOffsetTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/AlignByDeviceOrderByLimitOffsetTest.java
@@ -26,11 +26,11 @@ import
org.apache.iotdb.db.queryengine.plan.analyze.Analysis;
import org.apache.iotdb.db.queryengine.plan.planner.plan.DistributedQueryPlan;
import org.apache.iotdb.db.queryengine.plan.planner.plan.LogicalQueryPlan;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
+import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.AggregationMergeSortNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.AggregationNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.DeviceViewNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.ExchangeNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.FilterNode;
-import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.HorizontallyConcatNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.LimitNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.MergeSortNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.SingleDeviceViewNode;
@@ -272,15 +272,15 @@ public class AlignByDeviceOrderByLimitOffsetTest {
assertTrue(firstFiRoot.getChildren().get(0) instanceof LimitNode);
PlanNode filterNode = ((LimitNode)
firstFiRoot.getChildren().get(0)).getChild();
assertTrue(filterNode instanceof FilterNode);
- assertTrue(filterNode.getChildren().get(0) instanceof DeviceViewNode);
- assertTrue(filterNode.getChildren().get(0).getChildren().get(0) instanceof
AggregationNode);
+ assertTrue(filterNode.getChildren().get(0) instanceof
AggregationMergeSortNode);
+ assertTrue(filterNode.getChildren().get(0).getChildren().get(0) instanceof
DeviceViewNode);
assertTrue(
filterNode.getChildren().get(0).getChildren().get(0).getChildren().get(0)
- instanceof FilterNode);
+ instanceof AggregationNode);
PlanNode thirdFiRoot =
plan.getInstances().get(2).getFragment().getPlanNodeTree();
assertTrue(thirdFiRoot instanceof IdentitySinkNode);
- assertTrue(thirdFiRoot.getChildren().get(0) instanceof AggregationNode);
- assertTrue(thirdFiRoot.getChildren().get(0).getChildren().get(0)
instanceof FilterNode);
+ assertTrue(thirdFiRoot.getChildren().get(0) instanceof DeviceViewNode);
+ assertTrue(thirdFiRoot.getChildren().get(0).getChildren().get(0)
instanceof AggregationNode);
}
/*
@@ -911,7 +911,7 @@ public class AlignByDeviceOrderByLimitOffsetTest {
}
}
- /*
+ /* BEFORE:
* IdentitySinkNode-35
* └──TransformNode-12
* └──SortNode-11
@@ -940,6 +940,36 @@ public class AlignByDeviceOrderByLimitOffsetTest {
* └──HorizontallyConcatNode-28
* ├──SeriesAggregationScanNode-24:[SeriesPath: root.sg.d333.s2,
Descriptor: [AggregationDescriptor(count, PARTIAL)], DataRegion:
TConsensusGroupId(type:DataRegion, id:4)]
* └──SeriesAggregationScanNode-26:[SeriesPath: root.sg.d333.s1,
Descriptor: [AggregationDescriptor(count, PARTIAL)], DataRegion:
TConsensusGroupId(type:DataRegion, id:4)]
+ *
+ * AFTER:
+ * IdentitySinkNode-43
+ * └──TransformNode-12
+ * └──MergeSort-32
+ * ├──SortNode-33
+ * │ └──DeviceView-19
+ * │ ├──FullOuterTimeJoinNode-15
+ * │ │ ├──SeriesAggregationScanNode-13:[SeriesPath:
root.sg.d1.s2, Descriptor: [AggregationDescriptor(count, SINGLE)], DataRegion:
TConsensusGroupId(type:DataRegion, id:1)]
+ * │ │ └──SeriesAggregationScanNode-14:[SeriesPath:
root.sg.d1.s1, Descriptor: [AggregationDescriptor(count, SINGLE)], DataRegion:
TConsensusGroupId(type:DataRegion, id:1)]
+ * │ └──FullOuterTimeJoinNode-18
+ * │ ├──SeriesAggregationScanNode-16:[SeriesPath:
root.sg.d333.s2, Descriptor: [AggregationDescriptor(count, SINGLE)],
DataRegion: TConsensusGroupId(type:DataRegion, id:1)]
+ * │ └──SeriesAggregationScanNode-17:[SeriesPath:
root.sg.d333.s1, Descriptor: [AggregationDescriptor(count, SINGLE)],
DataRegion: TConsensusGroupId(type:DataRegion, id:1)]
+ * ├──ExchangeNode-37: [SourceAddress:192.0.3.1/test.2.0/40]
+ * ├──ExchangeNode-38: [SourceAddress:192.0.2.1/test.3.0/41]
+ * └──ExchangeNode-39: [SourceAddress:192.0.4.1/test.4.0/42]
+ *
+ * IdentitySinkNode-40
+ * └──SortNode-34
+ * └──DeviceView-23
+ * └──FullOuterTimeJoinNode-22
+ * ├──SeriesAggregationScanNode-20:[SeriesPath:
root.sg.d22.s1, Descriptor: [AggregationDescriptor(count, SINGLE)], DataRegion:
TConsensusGroupId(type:DataRegion, id:3)]
+ * └──SeriesAggregationScanNode-21:[SeriesPath:
root.sg.d22.s2, Descriptor: [AggregationDescriptor(count, SINGLE)], DataRegion:
TConsensusGroupId(type:DataRegion, id:3)]
+ *
+ * IdentitySinkNode-41
+ * └──SortNode-35
+ * └──DeviceView-27
+ * └──FullOuterTimeJoinNode-26
+ * ├──SeriesAggregationScanNode-24:[SeriesPath: root.sg.d1.s2,
Descriptor: [AggregationDescriptor(count, SINGLE)], DataRegion:
TConsensusGroupId(type:DataRegion, id:2)]
+ * └──SeriesAggregationScanNode-25:[SeriesPath: root.sg.d1.s1,
Descriptor: [AggregationDescriptor(count, SINGLE)], DataRegion:
TConsensusGroupId(type:DataRegion, id:2)]
*/
@Test
public void orderByExpressionTest3() {
@@ -955,29 +985,25 @@ public class AlignByDeviceOrderByLimitOffsetTest {
firstFiRoot = plan.getInstances().get(0).getFragment().getPlanNodeTree();
firstFiTopNode = firstFiRoot.getChildren().get(0);
assertTrue(firstFiTopNode instanceof TransformNode);
- assertTrue(firstFiTopNode.getChildren().get(0) instanceof SortNode);
- assertTrue(firstFiTopNode.getChildren().get(0).getChildren().get(0)
instanceof DeviceViewNode);
+ assertTrue(firstFiTopNode.getChildren().get(0) instanceof MergeSortNode);
+ assertTrue(firstFiTopNode.getChildren().get(0).getChildren().get(0)
instanceof SortNode);
assertTrue(
firstFiTopNode.getChildren().get(0).getChildren().get(0).getChildren().get(0)
- instanceof AggregationNode);
+ instanceof DeviceViewNode);
assertTrue(
-
firstFiTopNode.getChildren().get(0).getChildren().get(0).getChildren().get(1)
- instanceof ExchangeNode);
+
plan.getInstances().get(1).getFragment().getPlanNodeTree().getChildren().get(0)
+ instanceof SortNode);
assertTrue(
-
firstFiTopNode.getChildren().get(0).getChildren().get(0).getChildren().get(2)
- instanceof AggregationNode);
- for (int i = 1; i < 4; i++) {
- assertTrue(
-
plan.getInstances().get(i).getFragment().getPlanNodeTree().getChildren().get(0)
- instanceof HorizontallyConcatNode);
- }
+
plan.getInstances().get(2).getFragment().getPlanNodeTree().getChildren().get(0)
+ instanceof SortNode);
for (int i = 0; i < 4; i++) {
assertScanNodeLimitValue(plan.getInstances().get(i).getFragment().getPlanNodeTree(),
0);
}
}
/*
- * IdentitySinkNode-34
+ * BEFORE:
+ * IdentitySinkNode-34
* └──TopK-10
* └──DeviceView-12
* ├──AggregationNode-17
@@ -1005,6 +1031,43 @@ public class AlignByDeviceOrderByLimitOffsetTest {
* ├──SeriesAggregationScanNode-23:[SeriesPath: root.sg.d333.s2,
Descriptor: [AggregationDescriptor(count, PARTIAL)], DataRegion:
TConsensusGroupId(type:DataRegion, id:4)]
* └──SeriesAggregationScanNode-25:[SeriesPath: root.sg.d333.s1,
Descriptor: [AggregationDescriptor(count, PARTIAL)], DataRegion:
TConsensusGroupId(type:DataRegion, id:4)]
*/
+
+ /* AFTER:
+ * IdentitySinkNode-41
+ * └──TopK-10
+ * ├──TopK-31
+ * │ └──DeviceView-18
+ * │ ├──FullOuterTimeJoinNode-14
+ * │ │ ├──SeriesAggregationScanNode-12:[SeriesPath:
root.sg.d1.s2, Descriptor: [AggregationDescriptor(count, SINGLE)], DataRegion:
TConsensusGroupId(type:DataRegion, id:1)]
+ * │ │ └──SeriesAggregationScanNode-13:[SeriesPath:
root.sg.d1.s1, Descriptor: [AggregationDescriptor(count, SINGLE)], DataRegion:
TConsensusGroupId(type:DataRegion, id:1)]
+ * │ └──FullOuterTimeJoinNode-17
+ * │ ├──SeriesAggregationScanNode-15:[SeriesPath:
root.sg.d333.s2, Descriptor: [AggregationDescriptor(count, SINGLE)],
DataRegion: TConsensusGroupId(type:DataRegion, id:1)]
+ * │ └──SeriesAggregationScanNode-16:[SeriesPath:
root.sg.d333.s1, Descriptor: [AggregationDescriptor(count, SINGLE)],
DataRegion: TConsensusGroupId(type:DataRegion, id:1)]
+ * ├──ExchangeNode-35: [SourceAddress:192.0.3.1/test.2.0/38]
+ * ├──ExchangeNode-36: [SourceAddress:192.0.2.1/test.3.0/39]
+ * └──ExchangeNode-37: [SourceAddress:192.0.4.1/test.4.0/40]
+ *
+ * IdentitySinkNode-38
+ * └──TopK-32
+ * └──DeviceView-22
+ * └──FullOuterTimeJoinNode-21
+ * ├──SeriesAggregationScanNode-19:[SeriesPath:
root.sg.d22.s1, Descriptor: [AggregationDescriptor(count, SINGLE)], DataRegion:
TConsensusGroupId(type:DataRegion, id:3)]
+ * └──SeriesAggregationScanNode-20:[SeriesPath:
root.sg.d22.s2, Descriptor: [AggregationDescriptor(count, SINGLE)], DataRegion:
TConsensusGroupId(type:DataRegion, id:3)]
+ *
+ * IdentitySinkNode-39
+ * └──TopK-33
+ * └──DeviceView-26
+ * └──FullOuterTimeJoinNode-25
+ * ├──SeriesAggregationScanNode-23:[SeriesPath: root.sg.d1.s2,
Descriptor: [AggregationDescriptor(count, SINGLE)], DataRegion:
TConsensusGroupId(type:DataRegion, id:2)]
+ * └──SeriesAggregationScanNode-24:[SeriesPath: root.sg.d1.s1,
Descriptor: [AggregationDescriptor(count, SINGLE)], DataRegion:
TConsensusGroupId(type:DataRegion, id:2)]
+ *
+ * IdentitySinkNode-40
+ * └──TopK-34
+ * └──DeviceView-30
+ * └──FullOuterTimeJoinNode-29
+ * ├──SeriesAggregationScanNode-27:[SeriesPath:
root.sg.d333.s2, Descriptor: [AggregationDescriptor(count, SINGLE)],
DataRegion: TConsensusGroupId(type:DataRegion, id:4)]
+ * └──SeriesAggregationScanNode-28:[SeriesPath:
root.sg.d333.s1, Descriptor: [AggregationDescriptor(count, SINGLE)],
DataRegion: TConsensusGroupId(type:DataRegion, id:4)]
+ */
@Test
public void orderByExpressionTest4() {
// aggregation, order by expression, has LIMIT
@@ -1019,14 +1082,22 @@ public class AlignByDeviceOrderByLimitOffsetTest {
firstFiRoot = plan.getInstances().get(0).getFragment().getPlanNodeTree();
firstFiTopNode = firstFiRoot.getChildren().get(0);
assertTrue(firstFiTopNode instanceof TopKNode);
- assertTrue(firstFiTopNode.getChildren().get(0) instanceof DeviceViewNode);
- assertTrue(firstFiTopNode.getChildren().get(0).getChildren().get(0)
instanceof AggregationNode);
- assertTrue(firstFiTopNode.getChildren().get(0).getChildren().get(1)
instanceof ExchangeNode);
- assertTrue(firstFiTopNode.getChildren().get(0).getChildren().get(2)
instanceof AggregationNode);
+ assertTrue(firstFiTopNode.getChildren().get(0) instanceof TopKNode);
+ assertTrue(firstFiTopNode.getChildren().get(0).getChildren().get(0)
instanceof DeviceViewNode);
+ assertTrue(
+
firstFiTopNode.getChildren().get(0).getChildren().get(0).getChildren().get(0)
+ instanceof FullOuterTimeJoinNode);
for (int i = 1; i < 4; i++) {
assertTrue(
-
plan.getInstances().get(i).getFragment().getPlanNodeTree().getChildren().get(0)
- instanceof HorizontallyConcatNode);
+ plan.getInstances()
+ .get(i)
+ .getFragment()
+ .getPlanNodeTree()
+ .getChildren()
+ .get(0)
+ .getChildren()
+ .get(0)
+ instanceof DeviceViewNode);
}
for (int i = 0; i < 4; i++) {
assertScanNodeLimitValue(plan.getInstances().get(i).getFragment().getPlanNodeTree(),
0);
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/AlignedByDeviceTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/AlignedByDeviceTest.java
index f56a685a048..0e33d31c14d 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/AlignedByDeviceTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/AlignedByDeviceTest.java
@@ -26,6 +26,7 @@ import org.apache.iotdb.db.queryengine.plan.analyze.Analysis;
import org.apache.iotdb.db.queryengine.plan.planner.plan.DistributedQueryPlan;
import org.apache.iotdb.db.queryengine.plan.planner.plan.LogicalQueryPlan;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
+import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.AggregationMergeSortNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.AggregationNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.DeviceViewNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.ExchangeNode;
@@ -37,6 +38,8 @@ import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.TransformN
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.join.FullOuterTimeJoinNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.sink.IdentitySinkNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.sink.ShuffleSinkNode;
+import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.SeriesAggregationScanNode;
+import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.SeriesScanNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.SeriesSourceNode;
import org.junit.Test;
@@ -61,24 +64,19 @@ public class AlignedByDeviceTest {
PlanNode f1Root =
plan.getInstances().get(0).getFragment().getPlanNodeTree();
PlanNode f2Root =
plan.getInstances().get(1).getFragment().getPlanNodeTree();
assertTrue(f1Root instanceof IdentitySinkNode);
- assertTrue(f1Root.getChildren().get(0) instanceof DeviceViewNode);
- assertTrue(f1Root.getChildren().get(0).getChildren().get(0) instanceof
AggregationNode);
+ assertTrue(f1Root.getChildren().get(0) instanceof
AggregationMergeSortNode);
+ assertTrue(f1Root.getChildren().get(0).getChildren().get(0) instanceof
DeviceViewNode);
assertTrue(
f1Root.getChildren().get(0).getChildren().get(0).getChildren().get(0)
instanceof SeriesSourceNode);
assertTrue(
f1Root.getChildren().get(0).getChildren().get(0).getChildren().get(1)
- instanceof ExchangeNode);
- assertTrue(f1Root.getChildren().get(0).getChildren().get(1) instanceof
AggregationNode);
- assertTrue(
- f1Root.getChildren().get(0).getChildren().get(1).getChildren().get(0)
instanceof SeriesSourceNode);
- assertTrue(
- f1Root.getChildren().get(0).getChildren().get(1).getChildren().get(1)
- instanceof ExchangeNode);
+ assertTrue(f1Root.getChildren().get(0).getChildren().get(1) instanceof
ExchangeNode);
assertTrue(f2Root instanceof IdentitySinkNode);
- assertTrue(f2Root.getChildren().get(0) instanceof SeriesSourceNode);
- assertTrue(f2Root.getChildren().get(1) instanceof SeriesSourceNode);
+ assertTrue(f2Root.getChildren().get(0) instanceof DeviceViewNode);
+ assertTrue(
+ f2Root.getChildren().get(0).getChildren().get(0) instanceof
SeriesAggregationScanNode);
// test of MULTI_SERIES
sql = "select count(s1),count(s2) from root.sg.d333,root.sg.d4444 align by
device";
@@ -90,30 +88,19 @@ public class AlignedByDeviceTest {
f1Root = plan.getInstances().get(0).getFragment().getPlanNodeTree();
f2Root = plan.getInstances().get(1).getFragment().getPlanNodeTree();
assertTrue(f1Root instanceof IdentitySinkNode);
- assertTrue(f1Root.getChildren().get(0) instanceof DeviceViewNode);
- assertTrue(f1Root.getChildren().get(0).getChildren().get(0) instanceof
AggregationNode);
+ assertTrue(f1Root.getChildren().get(0) instanceof
AggregationMergeSortNode);
+ assertTrue(f1Root.getChildren().get(0).getChildren().get(0) instanceof
DeviceViewNode);
assertTrue(
f1Root.getChildren().get(0).getChildren().get(0).getChildren().get(0)
- instanceof SeriesSourceNode);
+ instanceof FullOuterTimeJoinNode);
assertTrue(
f1Root.getChildren().get(0).getChildren().get(0).getChildren().get(1)
- instanceof SeriesSourceNode);
- assertTrue(
- f1Root.getChildren().get(0).getChildren().get(0).getChildren().get(2)
- instanceof ExchangeNode);
- assertTrue(f1Root.getChildren().get(0).getChildren().get(1) instanceof
AggregationNode);
- assertTrue(
- f1Root.getChildren().get(0).getChildren().get(1).getChildren().get(0)
- instanceof SeriesSourceNode);
- assertTrue(
- f1Root.getChildren().get(0).getChildren().get(1).getChildren().get(1)
- instanceof SeriesSourceNode);
- assertTrue(
- f1Root.getChildren().get(0).getChildren().get(1).getChildren().get(2)
- instanceof ExchangeNode);
+ instanceof FullOuterTimeJoinNode);
+ assertTrue(f1Root.getChildren().get(0).getChildren().get(1) instanceof
ExchangeNode);
assertTrue(f2Root instanceof IdentitySinkNode);
- assertTrue(f2Root.getChildren().get(0) instanceof HorizontallyConcatNode);
- assertTrue(f2Root.getChildren().get(1) instanceof HorizontallyConcatNode);
+ assertTrue(f2Root.getChildren().get(0) instanceof DeviceViewNode);
+ assertTrue(f2Root.getChildren().get(0).getChildren().get(0) instanceof
FullOuterTimeJoinNode);
+ assertTrue(f2Root.getChildren().get(0).getChildren().get(1) instanceof
FullOuterTimeJoinNode);
}
@Test
@@ -132,76 +119,15 @@ public class AlignedByDeviceTest {
PlanNode f1Root =
plan.getInstances().get(0).getFragment().getPlanNodeTree();
PlanNode f2Root =
plan.getInstances().get(1).getFragment().getPlanNodeTree();
assertTrue(f1Root instanceof IdentitySinkNode);
- assertTrue(f1Root.getChildren().get(0) instanceof DeviceViewNode);
- assertTrue(f1Root.getChildren().get(0).getChildren().get(0) instanceof
AggregationNode);
+ assertTrue(f1Root.getChildren().get(0) instanceof
AggregationMergeSortNode);
+ assertTrue(f1Root.getChildren().get(0).getChildren().get(0) instanceof
DeviceViewNode);
assertTrue(
f1Root.getChildren().get(0).getChildren().get(0).getChildren().get(0)
- instanceof FilterNode);
- assertTrue(
-
f1Root.getChildren().get(0).getChildren().get(0).getChildren().get(0).getChildren().get(0)
- instanceof FullOuterTimeJoinNode);
- assertTrue(
- f1Root
- .getChildren()
- .get(0)
- .getChildren()
- .get(0)
- .getChildren()
- .get(0)
- .getChildren()
- .get(0)
- .getChildren()
- .get(0)
- instanceof SeriesSourceNode);
- assertTrue(
- f1Root
- .getChildren()
- .get(0)
- .getChildren()
- .get(0)
- .getChildren()
- .get(0)
- .getChildren()
- .get(0)
- .getChildren()
- .get(1)
- instanceof ExchangeNode);
- assertTrue(f1Root.getChildren().get(0).getChildren().get(1) instanceof
AggregationNode);
- assertTrue(
- f1Root.getChildren().get(0).getChildren().get(1).getChildren().get(0)
- instanceof FilterNode);
- assertTrue(
-
f1Root.getChildren().get(0).getChildren().get(1).getChildren().get(0).getChildren().get(0)
- instanceof FullOuterTimeJoinNode);
- assertTrue(
- f1Root
- .getChildren()
- .get(0)
- .getChildren()
- .get(1)
- .getChildren()
- .get(0)
- .getChildren()
- .get(0)
- .getChildren()
- .get(0)
- instanceof SeriesSourceNode);
- assertTrue(
- f1Root
- .getChildren()
- .get(0)
- .getChildren()
- .get(1)
- .getChildren()
- .get(0)
- .getChildren()
- .get(0)
- .getChildren()
- .get(1)
- instanceof ExchangeNode);
+ instanceof AggregationNode);
assertTrue(f2Root instanceof IdentitySinkNode);
- assertTrue(f2Root.getChildren().get(0) instanceof SeriesSourceNode);
- assertTrue(f2Root.getChildren().get(1) instanceof SeriesSourceNode);
+ assertTrue(f2Root.getChildren().get(0) instanceof DeviceViewNode);
+ assertTrue(f2Root.getChildren().get(0).getChildren().get(0) instanceof
AggregationNode);
+ assertTrue(f2Root.getChildren().get(0).getChildren().get(1) instanceof
AggregationNode);
// test of MULTI_SERIES
sql =
@@ -214,99 +140,19 @@ public class AlignedByDeviceTest {
f1Root = plan.getInstances().get(0).getFragment().getPlanNodeTree();
f2Root = plan.getInstances().get(1).getFragment().getPlanNodeTree();
assertTrue(f1Root instanceof IdentitySinkNode);
- assertTrue(f1Root.getChildren().get(0) instanceof DeviceViewNode);
- assertTrue(f1Root.getChildren().get(0).getChildren().get(0) instanceof
AggregationNode);
+ assertTrue(f1Root.getChildren().get(0) instanceof
AggregationMergeSortNode);
+ assertTrue(f1Root.getChildren().get(0).getChildren().get(0) instanceof
DeviceViewNode);
assertTrue(
f1Root.getChildren().get(0).getChildren().get(0).getChildren().get(0)
- instanceof FilterNode);
- assertTrue(
-
f1Root.getChildren().get(0).getChildren().get(0).getChildren().get(0).getChildren().get(0)
- instanceof FullOuterTimeJoinNode);
- assertTrue(
- f1Root
- .getChildren()
- .get(0)
- .getChildren()
- .get(0)
- .getChildren()
- .get(0)
- .getChildren()
- .get(0)
- .getChildren()
- .get(0)
- instanceof SeriesSourceNode);
- assertTrue(
- f1Root
- .getChildren()
- .get(0)
- .getChildren()
- .get(0)
- .getChildren()
- .get(0)
- .getChildren()
- .get(0)
- .getChildren()
- .get(1)
- instanceof SeriesSourceNode);
- assertTrue(
- f1Root
- .getChildren()
- .get(0)
- .getChildren()
- .get(0)
- .getChildren()
- .get(0)
- .getChildren()
- .get(0)
- .getChildren()
- .get(2)
- instanceof ExchangeNode);
- assertTrue(f1Root.getChildren().get(0).getChildren().get(1) instanceof
AggregationNode);
- assertTrue(
-
f1Root.getChildren().get(0).getChildren().get(1).getChildren().get(0).getChildren().get(0)
- instanceof FullOuterTimeJoinNode);
- assertTrue(
- f1Root
- .getChildren()
- .get(0)
- .getChildren()
- .get(1)
- .getChildren()
- .get(0)
- .getChildren()
- .get(0)
- .getChildren()
- .get(0)
- instanceof SeriesSourceNode);
- assertTrue(
- f1Root
- .getChildren()
- .get(0)
- .getChildren()
- .get(1)
- .getChildren()
- .get(0)
- .getChildren()
- .get(0)
- .getChildren()
- .get(1)
- instanceof SeriesSourceNode);
+ instanceof AggregationNode);
+ assertTrue(f1Root.getChildren().get(0).getChildren().get(1) instanceof
ExchangeNode);
assertTrue(
- f1Root
- .getChildren()
- .get(0)
- .getChildren()
- .get(1)
- .getChildren()
- .get(0)
- .getChildren()
- .get(0)
- .getChildren()
- .get(2)
- instanceof ExchangeNode);
+ f1Root.getChildren().get(0).getChildren().get(0).getChildren().get(1)
+ instanceof AggregationNode);
assertTrue(f2Root instanceof IdentitySinkNode);
- assertTrue(f2Root.getChildren().get(0) instanceof FullOuterTimeJoinNode);
- assertTrue(f2Root.getChildren().get(1) instanceof FullOuterTimeJoinNode);
+ assertTrue(f2Root.getChildren().get(0) instanceof DeviceViewNode);
+ assertTrue(f2Root.getChildren().get(0).getChildren().get(0) instanceof
AggregationNode);
+ assertTrue(f2Root.getChildren().get(0).getChildren().get(1) instanceof
AggregationNode);
}
@Test
@@ -679,25 +525,19 @@ public class AlignedByDeviceTest {
PlanNode f2Root =
plan.getInstances().get(1).getFragment().getPlanNodeTree();
PlanNode f3Root =
plan.getInstances().get(2).getFragment().getPlanNodeTree();
assertTrue(f1Root instanceof IdentitySinkNode);
- assertTrue(f1Root.getChildren().get(0) instanceof DeviceViewNode);
- assertTrue(f1Root.getChildren().get(0).getChildren().get(0) instanceof
AggregationNode);
+ assertTrue(f1Root.getChildren().get(0) instanceof
AggregationMergeSortNode);
+ assertTrue(f1Root.getChildren().get(0).getChildren().get(0) instanceof
DeviceViewNode);
assertTrue(
f1Root.getChildren().get(0).getChildren().get(0).getChildren().get(0)
- instanceof SeriesSourceNode);
+ instanceof SeriesAggregationScanNode);
assertTrue(
f1Root.getChildren().get(0).getChildren().get(0).getChildren().get(1)
- instanceof ExchangeNode);
- assertTrue(f1Root.getChildren().get(0).getChildren().get(1) instanceof
AggregationNode);
- assertTrue(
- f1Root.getChildren().get(0).getChildren().get(1).getChildren().get(0)
- instanceof SeriesSourceNode);
- assertTrue(
- f1Root.getChildren().get(0).getChildren().get(1).getChildren().get(1)
- instanceof ExchangeNode);
+ instanceof SeriesAggregationScanNode);
+ assertTrue(f1Root.getChildren().get(0).getChildren().get(1) instanceof
ExchangeNode);
assertTrue(f2Root instanceof IdentitySinkNode);
- assertTrue(f2Root.getChildren().get(0) instanceof SeriesSourceNode);
+ assertTrue(f2Root.getChildren().get(0) instanceof DeviceViewNode);
assertTrue(f3Root instanceof IdentitySinkNode);
- assertTrue(f3Root.getChildren().get(0) instanceof SeriesSourceNode);
+ assertTrue(f3Root.getChildren().get(0) instanceof DeviceViewNode);
// test of MULTI_SERIES
sql = "select count(s1),count(s2) from root.sg.d1,root.sg.d333 align by
device";
@@ -710,33 +550,47 @@ public class AlignedByDeviceTest {
f2Root = plan.getInstances().get(1).getFragment().getPlanNodeTree();
f3Root = plan.getInstances().get(2).getFragment().getPlanNodeTree();
assertTrue(f1Root instanceof IdentitySinkNode);
- assertTrue(f1Root.getChildren().get(0) instanceof DeviceViewNode);
- assertTrue(f1Root.getChildren().get(0).getChildren().get(0) instanceof
AggregationNode);
+ assertTrue(f1Root.getChildren().get(0) instanceof
AggregationMergeSortNode);
+ assertTrue(f1Root.getChildren().get(0).getChildren().get(0) instanceof
DeviceViewNode);
assertTrue(
f1Root.getChildren().get(0).getChildren().get(0).getChildren().get(0)
- instanceof SeriesSourceNode);
+ instanceof FullOuterTimeJoinNode);
assertTrue(
f1Root.getChildren().get(0).getChildren().get(0).getChildren().get(1)
- instanceof SeriesSourceNode);
- assertTrue(
- f1Root.getChildren().get(0).getChildren().get(0).getChildren().get(2)
- instanceof ExchangeNode);
- assertTrue(f1Root.getChildren().get(0).getChildren().get(1) instanceof
AggregationNode);
- assertTrue(
- f1Root.getChildren().get(0).getChildren().get(1).getChildren().get(0)
- instanceof SeriesSourceNode);
- assertTrue(
- f1Root.getChildren().get(0).getChildren().get(1).getChildren().get(1)
- instanceof SeriesSourceNode);
- assertTrue(
- f1Root.getChildren().get(0).getChildren().get(1).getChildren().get(2)
- instanceof ExchangeNode);
+ instanceof FullOuterTimeJoinNode);
+ assertTrue(f1Root.getChildren().get(0).getChildren().get(1) instanceof
ExchangeNode);
+ assertTrue(f1Root.getChildren().get(0).getChildren().get(1) instanceof
ExchangeNode);
assertTrue(f2Root instanceof IdentitySinkNode);
- assertTrue(f2Root.getChildren().get(0) instanceof HorizontallyConcatNode);
+ assertTrue(f2Root.getChildren().get(0) instanceof DeviceViewNode);
assertTrue(f3Root instanceof IdentitySinkNode);
- assertTrue(f3Root.getChildren().get(0) instanceof HorizontallyConcatNode);
+ assertTrue(f3Root.getChildren().get(0) instanceof DeviceViewNode);
}
+ /*
+ * IdentitySinkNode-28
+ * └──AggregationMergeSort-23
+ * ├──DeviceView-14
+ * │ ├──AggregationNode-10
+ * │ │ └──FilterNode-9
+ * │ │ └──SeriesScanNode-8:[SeriesPath: root.sg.d1.s1,
DataRegion: TConsensusGroupId(type:DataRegion, id:1)]
+ * │ └──AggregationNode-13
+ * │ └──FilterNode-12
+ * │ └──SeriesScanNode-11:[SeriesPath: root.sg.d333.s1,
DataRegion: TConsensusGroupId(type:DataRegion, id:1)]
+ * ├──ExchangeNode-24: [SourceAddress:192.0.2.1/test.2.0/26]
+ * └──ExchangeNode-25: [SourceAddress:192.0.4.1/test.3.0/27]
+ *
+ * IdentitySinkNode-26
+ * └──DeviceView-18
+ * └──AggregationNode-17
+ * └──FilterNode-16
+ * └──SeriesScanNode-15:[SeriesPath: root.sg.d1.s1,
DataRegion: TConsensusGroupId(type:DataRegion, id:2)]
+ *
+ * IdentitySinkNode-27
+ * └──DeviceView-22
+ * └──AggregationNode-21
+ * └──FilterNode-20
+ * └──SeriesScanNode-19:[SeriesPath: root.sg.d333.s1,
DataRegion: TConsensusGroupId(type:DataRegion, id:4)]
+ */
@Test
public void testAggregation2Device3RegionWithValueFilter() {
QueryId queryId = new QueryId("test");
@@ -754,77 +608,28 @@ public class AlignedByDeviceTest {
PlanNode f2Root =
plan.getInstances().get(1).getFragment().getPlanNodeTree();
PlanNode f3Root =
plan.getInstances().get(1).getFragment().getPlanNodeTree();
assertTrue(f1Root instanceof IdentitySinkNode);
- assertTrue(f1Root.getChildren().get(0) instanceof DeviceViewNode);
- assertTrue(f1Root.getChildren().get(0).getChildren().get(0) instanceof
AggregationNode);
+ assertTrue(f1Root.getChildren().get(0) instanceof
AggregationMergeSortNode);
+ assertTrue(f1Root.getChildren().get(0).getChildren().get(0) instanceof
DeviceViewNode);
assertTrue(
f1Root.getChildren().get(0).getChildren().get(0).getChildren().get(0)
- instanceof FilterNode);
- assertTrue(
-
f1Root.getChildren().get(0).getChildren().get(0).getChildren().get(0).getChildren().get(0)
- instanceof FullOuterTimeJoinNode);
- assertTrue(
- f1Root
- .getChildren()
- .get(0)
- .getChildren()
- .get(0)
- .getChildren()
- .get(0)
- .getChildren()
- .get(0)
- .getChildren()
- .get(0)
- instanceof SeriesSourceNode);
- assertTrue(
- f1Root
- .getChildren()
- .get(0)
- .getChildren()
- .get(0)
- .getChildren()
- .get(0)
- .getChildren()
- .get(0)
- .getChildren()
- .get(1)
- instanceof ExchangeNode);
- assertTrue(f1Root.getChildren().get(0).getChildren().get(1) instanceof
AggregationNode);
- assertTrue(
- f1Root.getChildren().get(0).getChildren().get(1).getChildren().get(0)
- instanceof FilterNode);
- assertTrue(
-
f1Root.getChildren().get(0).getChildren().get(1).getChildren().get(0).getChildren().get(0)
- instanceof FullOuterTimeJoinNode);
+ instanceof AggregationNode);
assertTrue(
f1Root
.getChildren()
.get(0)
.getChildren()
- .get(1)
- .getChildren()
.get(0)
.getChildren()
.get(0)
.getChildren()
.get(0)
- instanceof SeriesSourceNode);
- assertTrue(
- f1Root
- .getChildren()
- .get(0)
- .getChildren()
- .get(1)
- .getChildren()
- .get(0)
.getChildren()
.get(0)
- .getChildren()
- .get(1)
- instanceof ExchangeNode);
+ instanceof SeriesScanNode);
assertTrue(f2Root instanceof IdentitySinkNode);
- assertTrue(f2Root.getChildren().get(0) instanceof SeriesSourceNode);
+ assertTrue(f2Root.getChildren().get(0) instanceof DeviceViewNode);
assertTrue(f3Root instanceof IdentitySinkNode);
- assertTrue(f3Root.getChildren().get(0) instanceof SeriesSourceNode);
+ assertTrue(f3Root.getChildren().get(0) instanceof DeviceViewNode);
// test of MULTI_SERIES
sql = "select count(s1),count(s2) from root.sg.d1,root.sg.d333 where s1 <=
4 align by device";
@@ -837,100 +642,15 @@ public class AlignedByDeviceTest {
f2Root = plan.getInstances().get(1).getFragment().getPlanNodeTree();
f3Root = plan.getInstances().get(2).getFragment().getPlanNodeTree();
assertTrue(f1Root instanceof IdentitySinkNode);
- assertTrue(f1Root.getChildren().get(0) instanceof DeviceViewNode);
- assertTrue(f1Root.getChildren().get(0).getChildren().get(0) instanceof
AggregationNode);
+ assertTrue(f1Root.getChildren().get(0) instanceof
AggregationMergeSortNode);
+ assertTrue(f1Root.getChildren().get(0).getChildren().get(0) instanceof
DeviceViewNode);
assertTrue(
f1Root.getChildren().get(0).getChildren().get(0).getChildren().get(0)
- instanceof FilterNode);
- assertTrue(
-
f1Root.getChildren().get(0).getChildren().get(0).getChildren().get(0).getChildren().get(0)
- instanceof FullOuterTimeJoinNode);
- assertTrue(
- f1Root
- .getChildren()
- .get(0)
- .getChildren()
- .get(0)
- .getChildren()
- .get(0)
- .getChildren()
- .get(0)
- .getChildren()
- .get(0)
- instanceof SeriesSourceNode);
- assertTrue(
- f1Root
- .getChildren()
- .get(0)
- .getChildren()
- .get(0)
- .getChildren()
- .get(0)
- .getChildren()
- .get(0)
- .getChildren()
- .get(1)
- instanceof SeriesSourceNode);
- assertTrue(
- f1Root
- .getChildren()
- .get(0)
- .getChildren()
- .get(0)
- .getChildren()
- .get(0)
- .getChildren()
- .get(0)
- .getChildren()
- .get(2)
- instanceof ExchangeNode);
- assertTrue(f1Root.getChildren().get(0).getChildren().get(1) instanceof
AggregationNode);
- assertTrue(
-
f1Root.getChildren().get(0).getChildren().get(1).getChildren().get(0).getChildren().get(0)
- instanceof FullOuterTimeJoinNode);
- assertTrue(
- f1Root
- .getChildren()
- .get(0)
- .getChildren()
- .get(1)
- .getChildren()
- .get(0)
- .getChildren()
- .get(0)
- .getChildren()
- .get(0)
- instanceof SeriesSourceNode);
- assertTrue(
- f1Root
- .getChildren()
- .get(0)
- .getChildren()
- .get(1)
- .getChildren()
- .get(0)
- .getChildren()
- .get(0)
- .getChildren()
- .get(1)
- instanceof SeriesSourceNode);
- assertTrue(
- f1Root
- .getChildren()
- .get(0)
- .getChildren()
- .get(1)
- .getChildren()
- .get(0)
- .getChildren()
- .get(0)
- .getChildren()
- .get(2)
- instanceof ExchangeNode);
+ instanceof AggregationNode);
assertTrue(f2Root instanceof IdentitySinkNode);
- assertTrue(f2Root.getChildren().get(0) instanceof FullOuterTimeJoinNode);
+ assertTrue(f2Root.getChildren().get(0) instanceof DeviceViewNode);
assertTrue(f3Root instanceof IdentitySinkNode);
- assertTrue(f3Root.getChildren().get(0) instanceof FullOuterTimeJoinNode);
+ assertTrue(f3Root.getChildren().get(0) instanceof DeviceViewNode);
}
@Test
diff --git
a/iotdb-core/datanode/src/test/resources/datanode1conf/iotdb-common.properties
b/iotdb-core/datanode/src/test/resources/datanode1conf/iotdb-common.properties
index b89ef13778b..143c0e0b7f0 100644
---
a/iotdb-core/datanode/src/test/resources/datanode1conf/iotdb-common.properties
+++
b/iotdb-core/datanode/src/test/resources/datanode1conf/iotdb-common.properties
@@ -16,7 +16,6 @@
# specific language governing permissions and limitations
# under the License.
#
-
timestamp_precision=ms
udf_lib_dir=target/datanode1/ext/udf
trigger_lib_dir=target/datanode1/ext/trigger
diff --git
a/iotdb-core/datanode/src/test/resources/datanode3conf/iotdb-common.properties
b/iotdb-core/datanode/src/test/resources/datanode3conf/iotdb-common.properties
index 83dbc1b051e..1b48e21f303 100644
---
a/iotdb-core/datanode/src/test/resources/datanode3conf/iotdb-common.properties
+++
b/iotdb-core/datanode/src/test/resources/datanode3conf/iotdb-common.properties
@@ -16,7 +16,6 @@
# specific language governing permissions and limitations
# under the License.
#
-
timestamp_precision=ms
udf_lib_dir=target/datanode3/ext/udf
trigger_lib_dir=target/datanode3/ext/trigger