This is an automated email from the ASF dual-hosted git repository.
caogaofei pushed a commit to branch beyyes/agg_plan_device_cross_region
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to
refs/heads/beyyes/agg_plan_device_cross_region by this push:
new b480e5964ff temp
b480e5964ff is described below
commit b480e5964ffd689da6dc0803de63cf6a0dc0b3cb
Author: Beyyes <[email protected]>
AuthorDate: Wed Feb 28 01:00:22 2024 +0800
temp
---
.../iotdb/confignode/conf/ConfigNodeConfig.java | 2 +-
.../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 2 +-
.../process/AggregationMergeSortOperator.java | 138 +++++++++++++++++
.../plan/planner/OperatorTreeGenerator.java | 35 +++++
.../plan/planner/distribution/SourceRewriter.java | 170 +++++++++++++++++++--
.../node/process/AggregationMergeSortNode.java | 22 ++-
.../planner/plan/node/process/DeviceViewNode.java | 6 +-
.../plan/parameter/AggregationDescriptor.java | 11 +-
.../datanode1conf/iotdb-common.properties | 3 +
.../datanode2conf/iotdb-common.properties | 3 +
.../datanode3conf/iotdb-common.properties | 4 +-
11 files changed, 368 insertions(+), 28 deletions(-)
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java
index 608910104f1..d7f91373dc6 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java
@@ -72,7 +72,7 @@ public class ConfigNodeConfig {
private int dataReplicationFactor = 1;
/** Number of SeriesPartitionSlots per Database. */
- private int seriesSlotNum = 1000;
+ private int seriesSlotNum = 5;
/** SeriesPartitionSlot executor class. */
private String seriesPartitionExecutorClass =
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index 854401d7b5e..b18a7d42dcb 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -585,7 +585,7 @@ public class IoTDBConfig {
private long cacheFileReaderClearPeriod = 100000;
/** the max executing time of query in ms. Unit: millisecond */
- private long queryTimeoutThreshold = 60000;
+ private long queryTimeoutThreshold = 60000000;
/** the max time to live of a session in ms. Unit: millisecond */
private int sessionTimeoutThreshold = 0;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/AggregationMergeSortOperator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/AggregationMergeSortOperator.java
new file mode 100644
index 00000000000..c280ab2eb36
--- /dev/null
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/AggregationMergeSortOperator.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.execution.operator.process;
+
+import org.apache.iotdb.db.queryengine.execution.operator.Operator;
+import org.apache.iotdb.db.queryengine.execution.operator.OperatorContext;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.common.TimeRange;
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder;
+import org.apache.iotdb.tsfile.read.common.block.column.ColumnBuilder;
+import org.apache.iotdb.tsfile.read.common.block.column.TimeColumnBuilder;
+
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+public class AggregationMergeSortOperator extends AbstractConsumeAllOperator {
+
+ // private final ITimeRangeIterator timeRangeIterator;
+
+ // Current interval of aggregation window [curStartTime, curEndTime)
+ private TimeRange curTimeRange;
+
+ private final List<TSDataType> dataTypes;
+ private final TsBlockBuilder tsBlockBuilder;
+
+ private final boolean[] noMoreTsBlocks;
+
+ private boolean finished;
+
+ private boolean currentFinished;
+
+ private String currentDevice;
+
+ private long currentTime;
+
+ public AggregationMergeSortOperator(
+ OperatorContext operatorContext, List<Operator> children,
List<TSDataType> dataTypes) {
+ super(operatorContext, children);
+ this.dataTypes = dataTypes;
+ this.tsBlockBuilder = new TsBlockBuilder(dataTypes);
+ this.noMoreTsBlocks = new boolean[this.inputOperatorsCount];
+ }
+
+ @Override
+ public TsBlock next() throws Exception {
+ long startTime = System.nanoTime();
+ long maxRuntime =
operatorContext.getMaxRunTime().roundTo(TimeUnit.NANOSECONDS);
+
+ // 1. fill consumed up TsBlock
+ if (!prepareInput()) {
+ return null;
+ }
+
+ tsBlockBuilder.reset();
+ TimeColumnBuilder timeBuilder = tsBlockBuilder.getTimeColumnBuilder();
+ ColumnBuilder[] valueColumnBuilders =
tsBlockBuilder.getValueColumnBuilders();
+ for (TsBlock tsBlock : inputTsBlocks) {
+ timeBuilder.writeLong(tsBlock.getTimeColumn().getLong(0));
+
valueColumnBuilders[0].writeBinary(tsBlock.getValueColumns()[0].getBinary(0));
+ }
+
+ return tsBlockBuilder.build();
+ }
+
+ @Override
+ public boolean hasNext() throws Exception {
+ // TODO the child of DeviceViewNode already calc TimeRange?
+ // return curTimeRange != null || timeRangeIterator.hasNextTimeRange();
+
+ if (finished) {
+ return false;
+ }
+ for (int i = 0; i < inputOperatorsCount; i++) {
+ if (!isEmpty(i)) {
+ return true;
+ } else if (!noMoreTsBlocks[i]) {
+ if (!canCallNext[i] || children.get(i).hasNextWithTimer()) {
+ return true;
+ } else {
+ children.get(i).close();
+ children.set(i, null);
+ noMoreTsBlocks[i] = true;
+ inputTsBlocks[i] = null;
+ }
+ }
+ }
+ return false;
+ }
+
+ @Override
+ public boolean isFinished() throws Exception {
+ if (finished) {
+ return true;
+ }
+ finished = true;
+
+ for (int i = 0; i < inputOperatorsCount; i++) {
+ if (!noMoreTsBlocks[i] || !isEmpty(i)) {
+ finished = false;
+ break;
+ }
+ }
+ return finished;
+ }
+
+ @Override
+ public long calculateMaxPeekMemory() {
+ return 0;
+ }
+
+ @Override
+ public long calculateMaxReturnSize() {
+ return 0;
+ }
+
+ @Override
+ public long calculateRetainedSizeAfterCallingNext() {
+ return 0;
+ }
+}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/OperatorTreeGenerator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/OperatorTreeGenerator.java
index 1b5f9508638..61174b5e776 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/OperatorTreeGenerator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/OperatorTreeGenerator.java
@@ -45,6 +45,7 @@ import
org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceManage
import org.apache.iotdb.db.queryengine.execution.operator.AggregationUtil;
import org.apache.iotdb.db.queryengine.execution.operator.Operator;
import org.apache.iotdb.db.queryengine.execution.operator.OperatorContext;
+import
org.apache.iotdb.db.queryengine.execution.operator.process.AggregationMergeSortOperator;
import
org.apache.iotdb.db.queryengine.execution.operator.process.AggregationOperator;
import
org.apache.iotdb.db.queryengine.execution.operator.process.ColumnInjectOperator;
import
org.apache.iotdb.db.queryengine.execution.operator.process.DeviceViewIntoOperator;
@@ -166,6 +167,7 @@ import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.read.Sche
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.read.SchemaQueryScanNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.read.TimeSeriesCountNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.read.TimeSeriesSchemaScanNode;
+import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.AggregationMergeSortNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.AggregationNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.ColumnInjectNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.DeviceViewIntoNode;
@@ -857,6 +859,39 @@ public class OperatorTreeGenerator extends
PlanVisitor<Operator, LocalExecutionP
MergeSortComparator.getComparator(sortItemList, sortItemIndexList,
sortItemDataTypeList));
}
+ @Override
+ public Operator visitAggregationMergeSort(
+ AggregationMergeSortNode node, LocalExecutionPlanContext context) {
+ OperatorContext operatorContext =
+ context
+ .getDriverContext()
+ .addOperatorContext(
+ context.getNextOperatorId(),
+ node.getPlanNodeId(),
+ MergeSortOperator.class.getSimpleName());
+ List<TSDataType> dataTypes = getOutputColumnTypes(node,
context.getTypeProvider());
+ List<Operator> children = dealWithConsumeAllChildrenPipelineBreaker(node,
context);
+
+ // for (Expression expression : selectExpressions) {
+ // if (expression instanceof FunctionExpression) {
+ // FunctionExpression functionExpression = (FunctionExpression)
expression;
+ // String functionName = functionExpression.getFunctionName();
+ // expression.getExpressionType();
+ // Accumulator accumulator = AccumulatorFactory.createAccumulator(
+ // functionName,
+ // aggregationType,
+ //
+ //
Collections.singletonList(context.getTypeProvider().getType(functionExpression.getOutputSymbol())),
+ // null,
+ // null,
+ // true,
+ // true);
+ // }
+ // }
+
+ return new AggregationMergeSortOperator(operatorContext, children,
dataTypes);
+ }
+
@Override
public Operator visitTopK(TopKNode node, LocalExecutionPlanContext context) {
OperatorContext operatorContext =
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/SourceRewriter.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/SourceRewriter.java
index 53814483e4c..57c807c8a41 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/SourceRewriter.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/SourceRewriter.java
@@ -29,6 +29,7 @@ import org.apache.iotdb.commons.utils.TimePartitionUtils;
import org.apache.iotdb.db.queryengine.common.MPPQueryContext;
import org.apache.iotdb.db.queryengine.plan.analyze.Analysis;
import org.apache.iotdb.db.queryengine.plan.expression.Expression;
+import
org.apache.iotdb.db.queryengine.plan.expression.multi.FunctionExpression;
import org.apache.iotdb.db.queryengine.plan.planner.LogicalPlanBuilder;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.BaseSourceRewriter;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
@@ -72,8 +73,10 @@ import
org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.OrderByParame
import org.apache.iotdb.db.queryengine.plan.statement.component.OrderByKey;
import org.apache.iotdb.db.queryengine.plan.statement.component.Ordering;
import org.apache.iotdb.db.queryengine.plan.statement.component.SortItem;
+import org.apache.iotdb.db.utils.constant.SqlConstant;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
@@ -84,8 +87,13 @@ import java.util.Set;
import java.util.TreeSet;
import java.util.stream.Collectors;
+import static org.apache.iotdb.commons.conf.IoTDBConstant.LAST_VALUE;
import static
org.apache.iotdb.commons.conf.IoTDBConstant.MULTI_LEVEL_PATH_WILDCARD;
import static org.apache.iotdb.commons.partition.DataPartition.NOT_ASSIGNED;
+import static
org.apache.iotdb.db.queryengine.plan.planner.LogicalPlanBuilder.updateTypeProviderByPartialAggregation;
+import static org.apache.iotdb.db.utils.constant.SqlConstant.AVG;
+import static org.apache.iotdb.db.utils.constant.SqlConstant.FIRST_VALUE;
+import static org.apache.iotdb.db.utils.constant.SqlConstant.TIME_DURATION;
public class SourceRewriter extends
BaseSourceRewriter<DistributionPlanContext> {
@@ -123,7 +131,6 @@ public class SourceRewriter extends
BaseSourceRewriter<DistributionPlanContext>
public List<PlanNode> visitSingleDeviceView(
SingleDeviceViewNode node, DistributionPlanContext context) {
- // Same process logic as visitDeviceView
if (analysis.isDeviceViewSpecialProcess()) {
List<PlanNode> rewroteChildren = rewrite(node.getChild(), context);
if (rewroteChildren.size() != 1) {
@@ -190,9 +197,10 @@ public class SourceRewriter extends
BaseSourceRewriter<DistributionPlanContext>
if (regionReplicaSets.size() > 1) {
// specialProcess and existDeviceCrossRegion, use the old aggregation
logic
analysis.setExistDeviceCrossRegion();
- if (analysis.isDeviceViewSpecialProcess()) {
- return processSpecialDeviceView(node, context);
- }
+ // TODO group by session, variation, count, count_if no not use old
logic
+ // if (analysis.isDeviceViewSpecialProcess()) {
+ // return processSpecialDeviceView(node, context);
+ // }
}
deviceViewSplits.add(new DeviceViewSplit(outputDevice, child,
regionReplicaSets));
relatedDataRegions.addAll(regionReplicaSets);
@@ -217,13 +225,78 @@ public class SourceRewriter extends
BaseSourceRewriter<DistributionPlanContext>
return deviceViewNodeList;
}
- MergeSortNode mergeSortNode =
- new MergeSortNode(
- context.queryContext.getQueryId().genPlanNodeId(),
- node.getMergeOrderParameter(),
- node.getOutputColumnNames());
- deviceViewNodeList.forEach(mergeSortNode::addChild);
- return Collections.singletonList(mergeSortNode);
+ if (analysis.isExistDeviceCrossRegion() &&
analysis.isDeviceViewSpecialProcess()) {
+ // return processSpecialDeviceView(node, context);
+
+ // TODO 1. generate old and new measurement idx relationship 2. generate
new outputColumns for
+ // each subDeviceView
+// Map<Integer, List<Integer>> newMeasurementIdxMap = new HashMap<>();
+// List<String> newPartialOutputColumns = new ArrayList<>();
+//
+ Set<Expression> selectExpressions = analysis.getSelectExpressions();
+//
+// int i = 0, idxSum = 0;
+// for (Expression expression : selectExpressions) {
+// if (i == 0) {
+// // device
+// newPartialOutputColumns.add(expression.getOutputSymbol());
+// i++;
+// idxSum++;
+// continue;
+// }
+// FunctionExpression aggExpression = (FunctionExpression) expression;
+// List<String> actualPartialAggregationNames =
+//
getActualPartialAggregationNames(aggExpression.getFunctionName());
+// for (String actualAggName : actualPartialAggregationNames) {
+// newPartialOutputColumns.add(
+// new FunctionExpression(
+// actualAggName,
+// aggExpression.getFunctionAttributes(),
+// aggExpression.getExpressions())
+// .getOutputSymbol());
+// }
+// // TODO need update typeProvider?
+// if (actualPartialAggregationNames.size() > 1) {
+// newMeasurementIdxMap.put(i, Arrays.asList(idxSum++, idxSum++));
+// } else {
+// newMeasurementIdxMap.put(i, Collections.singletonList(idxSum++));
+// }
+// i++;
+// }
+
+// for (String device : node.getDevices()) {
+// List<Integer> oldMeasurementList =
+// node.getDeviceToMeasurementIndexesMap().get(device);
+// List<Integer> newMeasurementList = new ArrayList<>();
+// for (int idx : oldMeasurementList) {
+// newMeasurementList.addAll(newMeasurementIdxMap.get(idx));
+// }
+// node.getDeviceToMeasurementIndexesMap().put(device,
newMeasurementList);
+// }
+
+ for (PlanNode planNode : deviceViewNodeList) {
+ DeviceViewNode deviceViewNode = (DeviceViewNode) planNode;
+ // deviceViewNode.setOutputColumnNames(newPartialOutputColumns);
+ transferAggregatorsRecursively2(planNode, context);
+ }
+
+ AggregationMergeSortNode mergeSortNode =
+ new AggregationMergeSortNode(
+ context.queryContext.getQueryId().genPlanNodeId(),
+ node.getMergeOrderParameter(),
+ node.getOutputColumnNames(),
+ selectExpressions);
+ deviceViewNodeList.forEach(mergeSortNode::addChild);
+ return Collections.singletonList(mergeSortNode);
+ } else {
+ MergeSortNode mergeSortNode =
+ new MergeSortNode(
+ context.queryContext.getQueryId().genPlanNodeId(),
+ node.getMergeOrderParameter(),
+ node.getOutputColumnNames());
+ deviceViewNodeList.forEach(mergeSortNode::addChild);
+ return Collections.singletonList(mergeSortNode);
+ }
}
private void constructDeviceViewNodeListWithCrossRegion(
@@ -286,9 +359,74 @@ public class SourceRewriter extends
BaseSourceRewriter<DistributionPlanContext>
}
}
+ public List<String> getActualPartialAggregationNames(String aggregationType)
{
+ List<String> outputAggregationNames = new ArrayList<>();
+ switch (aggregationType) {
+ case AVG:
+ outputAggregationNames.add(SqlConstant.COUNT);
+ outputAggregationNames.add(SqlConstant.SUM);
+ break;
+ case FIRST_VALUE:
+ outputAggregationNames.add(FIRST_VALUE);
+ outputAggregationNames.add(SqlConstant.MIN_TIME);
+ break;
+ case LAST_VALUE:
+ outputAggregationNames.add(SqlConstant.LAST_VALUE);
+ outputAggregationNames.add(SqlConstant.MAX_TIME);
+ break;
+ case TIME_DURATION:
+ outputAggregationNames.add(SqlConstant.MAX_TIME);
+ outputAggregationNames.add(SqlConstant.MIN_TIME);
+ break;
+ default:
+ // TODO how about UDAF?
+ outputAggregationNames.add(aggregationType);
+ }
+ return outputAggregationNames;
+ }
+
+ private void transferAggregatorsRecursively2(PlanNode planNode,
DistributionPlanContext context) {
+ if (planNode instanceof SeriesAggregationSourceNode) {
+ SeriesAggregationSourceNode scanSourceNode =
(SeriesAggregationSourceNode) planNode;
+ for (AggregationDescriptor descriptor :
scanSourceNode.getAggregationDescriptorList()) {
+ descriptor.setStep(AggregationStep.PARTIAL);
+ updateTypeProviderByPartialAggregation(descriptor,
context.queryContext.getTypeProvider());
+ }
+ }
+
+ for (PlanNode child : planNode.getChildren()) {
+ transferAggregatorsRecursively2(child, context);
+ }
+ }
+
+ private void transferAggregatorsRecursively(PlanNode planNode) {
+ for (PlanNode child : planNode.getChildren()) {
+ transferAggregatorsRecursively(child);
+
+ if (child instanceof SeriesAggregationSourceNode) {
+ SeriesAggregationSourceNode scanSourceNode =
(SeriesAggregationSourceNode) child;
+ List<AggregationDescriptor> newDescriptorList = new ArrayList<>();
+ for (AggregationDescriptor descriptor :
scanSourceNode.getAggregationDescriptorList()) {
+ List<String> aggregationNames =
descriptor.getActualAggregationNames(true);
+ for (String aggregationName : aggregationNames) {
+ newDescriptorList.add(
+ new AggregationDescriptor(
+ aggregationName,
+ AggregationStep.PARTIAL,
+ descriptor.getInputExpressions(),
+ descriptor.getInputAttributes()));
+ }
+ }
+ scanSourceNode.setAggregationDescriptorList(newDescriptorList);
+ }
+
+ }
+ }
+
@Override
public List<PlanNode> visitAggregationMergeSort(
AggregationMergeSortNode node, DistributionPlanContext context) {
+ // TODO remove this method?
return null;
}
@@ -622,7 +760,7 @@ public class SourceRewriter extends
BaseSourceRewriter<DistributionPlanContext>
descriptor.getInputAttributes())));
leafAggDescriptorList.forEach(
d ->
- LogicalPlanBuilder.updateTypeProviderByPartialAggregation(
+ updateTypeProviderByPartialAggregation(
d, context.queryContext.getTypeProvider()));
List<AggregationDescriptor> rootAggDescriptorList = new ArrayList<>();
node.getAggregationDescriptorList()
@@ -1308,7 +1446,7 @@ public class SourceRewriter extends
BaseSourceRewriter<DistributionPlanContext>
}
if (keep) {
descriptorList.add(originalDescriptor);
- LogicalPlanBuilder.updateTypeProviderByPartialAggregation(
+ updateTypeProviderByPartialAggregation(
originalDescriptor, context.queryContext.getTypeProvider());
}
}
@@ -1352,7 +1490,7 @@ public class SourceRewriter extends
BaseSourceRewriter<DistributionPlanContext>
descriptor.setStep(level == 0 ? AggregationStep.FINAL :
AggregationStep.INTERMEDIATE);
descriptor.setInputExpressions(new ArrayList<>(descriptorExpressions));
descriptorList.add(descriptor);
- LogicalPlanBuilder.updateTypeProviderByPartialAggregation(
+ updateTypeProviderByPartialAggregation(
descriptor, context.queryContext.getTypeProvider());
}
handle.setGroupByLevelDescriptors(descriptorList);
@@ -1450,7 +1588,7 @@ public class SourceRewriter extends
BaseSourceRewriter<DistributionPlanContext>
.forEach(
d -> {
d.setStep(AggregationStep.PARTIAL);
- LogicalPlanBuilder.updateTypeProviderByPartialAggregation(
+ updateTypeProviderByPartialAggregation(
d, context.queryContext.getTypeProvider());
});
}
@@ -1489,7 +1627,7 @@ public class SourceRewriter extends
BaseSourceRewriter<DistributionPlanContext>
} else {
eachSeriesOneRegion[0] = false;
descriptor.setStep(AggregationStep.PARTIAL);
- LogicalPlanBuilder.updateTypeProviderByPartialAggregation(
+ updateTypeProviderByPartialAggregation(
descriptor, context.queryContext.getTypeProvider());
}
});
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/process/AggregationMergeSortNode.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/process/AggregationMergeSortNode.java
index 41c4cfb2b55..c446715efae 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/process/AggregationMergeSortNode.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/process/AggregationMergeSortNode.java
@@ -19,6 +19,7 @@
package org.apache.iotdb.db.queryengine.plan.planner.plan.node.process;
+import org.apache.iotdb.db.queryengine.plan.expression.Expression;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeType;
@@ -32,6 +33,7 @@ import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
+import java.util.Set;
public class AggregationMergeSortNode extends MultiChildProcessNode {
@@ -39,21 +41,29 @@ public class AggregationMergeSortNode extends
MultiChildProcessNode {
private final List<String> outputColumns;
+ private final Set<Expression> selectExpressions;
+
public AggregationMergeSortNode(
- PlanNodeId id, OrderByParameter mergeOrderParameter, List<String>
outputColumns) {
+ PlanNodeId id,
+ OrderByParameter mergeOrderParameter,
+ List<String> outputColumns,
+ Set<Expression> selectExpressions) {
super(id);
this.mergeOrderParameter = mergeOrderParameter;
this.outputColumns = outputColumns;
+ this.selectExpressions = selectExpressions;
}
public AggregationMergeSortNode(
PlanNodeId id,
List<PlanNode> children,
OrderByParameter mergeOrderParameter,
- List<String> outputColumns) {
+ List<String> outputColumns,
+ Set<Expression> selectExpressions) {
super(id, children);
this.mergeOrderParameter = mergeOrderParameter;
this.outputColumns = outputColumns;
+ this.selectExpressions = selectExpressions;
}
public OrderByParameter getMergeOrderParameter() {
@@ -62,7 +72,8 @@ public class AggregationMergeSortNode extends
MultiChildProcessNode {
@Override
public PlanNode clone() {
- return new AggregationMergeSortNode(getPlanNodeId(),
getMergeOrderParameter(), outputColumns);
+ return new AggregationMergeSortNode(
+ getPlanNodeId(), getMergeOrderParameter(), outputColumns,
selectExpressions);
}
@Override
@@ -71,7 +82,8 @@ public class AggregationMergeSortNode extends
MultiChildProcessNode {
new PlanNodeId(String.format("%s-%s", getPlanNodeId(), subNodeId)),
new ArrayList<>(children.subList(startIndex, endIndex)),
getMergeOrderParameter(),
- outputColumns);
+ outputColumns,
+ selectExpressions);
}
@Override
@@ -113,7 +125,7 @@ public class AggregationMergeSortNode extends
MultiChildProcessNode {
columnSize--;
}
PlanNodeId planNodeId = PlanNodeId.deserialize(byteBuffer);
- return new AggregationMergeSortNode(planNodeId, orderByParameter,
outputColumns);
+ return new AggregationMergeSortNode(planNodeId, orderByParameter,
outputColumns, null);
}
@Override
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/process/DeviceViewNode.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/process/DeviceViewNode.java
index 4d4b9b67049..bc7204a0d40 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/process/DeviceViewNode.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/process/DeviceViewNode.java
@@ -52,7 +52,7 @@ public class DeviceViewNode extends MultiChildProcessNode {
private final List<String> devices = new ArrayList<>();
// Device column and measurement columns in result output
- private final List<String> outputColumnNames;
+ private List<String> outputColumnNames;
// e.g. [s1,s2,s3] is query, but [s1, s3] exists in device1, then device1 ->
[1, 3], s1 is 1 but
// not 0 because device is the first column
@@ -114,6 +114,10 @@ public class DeviceViewNode extends MultiChildProcessNode {
return outputColumnNames;
}
+ public void setOutputColumnNames(List<String> outputColumnNames) {
+ this.outputColumnNames = outputColumnNames;
+ }
+
@Override
public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {
return visitor.visitDeviceView(this, context);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/parameter/AggregationDescriptor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/parameter/AggregationDescriptor.java
index 69eb807711e..ff6abfe2017 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/parameter/AggregationDescriptor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/parameter/AggregationDescriptor.java
@@ -36,8 +36,13 @@ import java.util.Map;
import java.util.Objects;
import java.util.stream.Collectors;
+import static org.apache.iotdb.commons.conf.IoTDBConstant.LAST_VALUE;
import static
org.apache.iotdb.db.queryengine.execution.operator.AggregationUtil.addPartialSuffix;
import static
org.apache.iotdb.db.queryengine.execution.operator.AggregationUtil.isBuiltinAggregationName;
+import static org.apache.iotdb.db.utils.constant.SqlConstant.AVG;
+import static org.apache.iotdb.db.utils.constant.SqlConstant.FIRST_VALUE;
+import static org.apache.iotdb.db.utils.constant.SqlConstant.STDDEV;
+import static org.apache.iotdb.db.utils.constant.SqlConstant.TIME_DURATION;
public class AggregationDescriptor {
@@ -143,7 +148,7 @@ public class AggregationDescriptor {
}
/** Keep the lower case of function name for partial result, and origin
value for others. */
- protected List<String> getActualAggregationNames(boolean isPartial) {
+ public List<String> getActualAggregationNames(boolean isPartial) {
List<String> outputAggregationNames = new ArrayList<>();
if (isPartial) {
switch (aggregationType) {
@@ -152,7 +157,7 @@ public class AggregationDescriptor {
outputAggregationNames.add(SqlConstant.SUM);
break;
case FIRST_VALUE:
- outputAggregationNames.add(SqlConstant.FIRST_VALUE);
+ outputAggregationNames.add(FIRST_VALUE);
outputAggregationNames.add(SqlConstant.MIN_TIME);
break;
case LAST_VALUE:
@@ -164,7 +169,7 @@ public class AggregationDescriptor {
outputAggregationNames.add(SqlConstant.MIN_TIME);
break;
case STDDEV:
- outputAggregationNames.add(addPartialSuffix(SqlConstant.STDDEV));
+ outputAggregationNames.add(addPartialSuffix(STDDEV));
break;
case STDDEV_POP:
outputAggregationNames.add(addPartialSuffix(SqlConstant.STDDEV_POP));
diff --git
a/iotdb-core/datanode/src/test/resources/datanode1conf/iotdb-common.properties
b/iotdb-core/datanode/src/test/resources/datanode1conf/iotdb-common.properties
index b89ef13778b..4ef45b29710 100644
---
a/iotdb-core/datanode/src/test/resources/datanode1conf/iotdb-common.properties
+++
b/iotdb-core/datanode/src/test/resources/datanode1conf/iotdb-common.properties
@@ -17,6 +17,9 @@
# under the License.
#
+query_timeout_threshold=60000000
+series_slot_num=5
+data_replication_factor=1
timestamp_precision=ms
udf_lib_dir=target/datanode1/ext/udf
trigger_lib_dir=target/datanode1/ext/trigger
diff --git
a/iotdb-core/datanode/src/test/resources/datanode2conf/iotdb-common.properties
b/iotdb-core/datanode/src/test/resources/datanode2conf/iotdb-common.properties
index 9cf060d61fd..1d3ce663a5b 100644
---
a/iotdb-core/datanode/src/test/resources/datanode2conf/iotdb-common.properties
+++
b/iotdb-core/datanode/src/test/resources/datanode2conf/iotdb-common.properties
@@ -16,6 +16,9 @@
# specific language governing permissions and limitations
# under the License.
#
+query_timeout_threshold=60000000
+series_slot_num=5
+data_replication_factor=1
timestamp_precision=ms
udf_lib_dir=target/datanode2/ext/udf
trigger_lib_dir=target/datanode2/ext/trigger
diff --git
a/iotdb-core/datanode/src/test/resources/datanode3conf/iotdb-common.properties
b/iotdb-core/datanode/src/test/resources/datanode3conf/iotdb-common.properties
index 83dbc1b051e..cac71c4ccfd 100644
---
a/iotdb-core/datanode/src/test/resources/datanode3conf/iotdb-common.properties
+++
b/iotdb-core/datanode/src/test/resources/datanode3conf/iotdb-common.properties
@@ -16,7 +16,9 @@
# specific language governing permissions and limitations
# under the License.
#
-
+query_timeout_threshold=60000000
+series_slot_num=5
+data_replication_factor=1
timestamp_precision=ms
udf_lib_dir=target/datanode3/ext/udf
trigger_lib_dir=target/datanode3/ext/trigger