This is an automated email from the ASF dual-hosted git repository.
caogaofei pushed a commit to branch beyyes/agg_plan_device_cross_region
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to
refs/heads/beyyes/agg_plan_device_cross_region by this push:
new 1dddfda57e7 temp
1dddfda57e7 is described below
commit 1dddfda57e730e05606a4ceed081cc1aa08bf871
Author: Beyyes <[email protected]>
AuthorDate: Wed Feb 28 21:13:53 2024 +0800
temp
---
.../execution/aggregation/Accumulator.java | 5 +
.../execution/aggregation/AvgAccumulator.java | 5 +
.../execution/aggregation/CountAccumulator.java | 5 +
.../execution/aggregation/CountIfAccumulator.java | 5 +
.../aggregation/CountTimeAccumulator.java | 5 +
.../execution/aggregation/ExtremeAccumulator.java | 5 +
.../aggregation/FirstValueAccumulator.java | 5 +
.../aggregation/LastValueAccumulator.java | 5 +
.../execution/aggregation/MaxValueAccumulator.java | 5 +
.../execution/aggregation/MinTimeAccumulator.java | 5 +
.../execution/aggregation/MinValueAccumulator.java | 5 +
.../aggregation/TimeDurationAccumulator.java | 5 +
.../execution/aggregation/UDAFAccumulator.java | 5 +
.../execution/aggregation/VarianceAccumulator.java | 5 +
.../process/AggregationMergeSortOperator.java | 173 ++++++++++++++++++---
.../plan/planner/OperatorTreeGenerator.java | 68 +++++---
.../plan/planner/distribution/SourceRewriter.java | 126 ++++++++-------
.../node/process/AggregationMergeSortNode.java | 25 ++-
.../plan/parameter/AggregationDescriptor.java | 2 +-
19 files changed, 361 insertions(+), 103 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/Accumulator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/Accumulator.java
index 9386903f493..7d765aa5857 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/Accumulator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/Accumulator.java
@@ -85,4 +85,9 @@ public interface Accumulator {
TSDataType[] getIntermediateType();
TSDataType getFinalType();
+
+ default int getPartialResultSize() {
+ throw new UnsupportedOperationException(
+ "This type of accumulator does not support getPartialResultSize!");
+ }
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/AvgAccumulator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/AvgAccumulator.java
index 318dbb7ada2..66ff521ce69 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/AvgAccumulator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/AvgAccumulator.java
@@ -163,6 +163,11 @@ public class AvgAccumulator implements Accumulator {
return TSDataType.DOUBLE;
}
+ @Override
+ public int getPartialResultSize() {
+ return 2;
+ }
+
private void addIntInput(Column[] column, BitMap bitMap) {
int count = column[0].getPositionCount();
for (int i = 0; i < count; i++) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/CountAccumulator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/CountAccumulator.java
index d7e1d621033..3273629490f 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/CountAccumulator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/CountAccumulator.java
@@ -121,4 +121,9 @@ public class CountAccumulator implements Accumulator {
public TSDataType getFinalType() {
return TSDataType.INT64;
}
+
+ @Override
+ public int getPartialResultSize() {
+ return 0;
+ }
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/CountIfAccumulator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/CountIfAccumulator.java
index 8638ef6e3c6..0f20730618e 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/CountIfAccumulator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/CountIfAccumulator.java
@@ -143,4 +143,9 @@ public class CountIfAccumulator implements Accumulator {
public TSDataType getFinalType() {
return TSDataType.INT64;
}
+
+ @Override
+ public int getPartialResultSize() {
+ return 1;
+ }
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/CountTimeAccumulator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/CountTimeAccumulator.java
index a8a7d888762..e7ed4e57ac2 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/CountTimeAccumulator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/CountTimeAccumulator.java
@@ -108,4 +108,9 @@ public class CountTimeAccumulator implements Accumulator {
public TSDataType getFinalType() {
return TSDataType.INT64;
}
+
+ @Override
+ public int getPartialResultSize() {
+ return 1;
+ }
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/ExtremeAccumulator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/ExtremeAccumulator.java
index 2ae041cf1c8..fc20b23ac32 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/ExtremeAccumulator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/ExtremeAccumulator.java
@@ -225,6 +225,11 @@ public class ExtremeAccumulator implements Accumulator {
return extremeResult.getDataType();
}
+ @Override
+ public int getPartialResultSize() {
+ return 1;
+ }
+
private void addIntInput(Column[] column, BitMap bitMap) {
int count = column[0].getPositionCount();
for (int i = 0; i < count; i++) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/FirstValueAccumulator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/FirstValueAccumulator.java
index d4262d453b2..8514f67091d 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/FirstValueAccumulator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/FirstValueAccumulator.java
@@ -252,6 +252,11 @@ public class FirstValueAccumulator implements Accumulator {
return firstValue.getDataType();
}
+ @Override
+ public int getPartialResultSize() {
+ return 2;
+ }
+
protected void addIntInput(Column[] column, BitMap bitMap) {
int count = column[0].getPositionCount();
for (int i = 0; i < count; i++) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/LastValueAccumulator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/LastValueAccumulator.java
index 9891b1a528f..6d38324559a 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/LastValueAccumulator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/LastValueAccumulator.java
@@ -252,6 +252,11 @@ public class LastValueAccumulator implements Accumulator {
return lastValue.getDataType();
}
+ @Override
+ public int getPartialResultSize() {
+ return 2;
+ }
+
protected void addIntInput(Column[] column, BitMap bitMap) {
int count = column[0].getPositionCount();
for (int i = 0; i < count; i++) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/MaxValueAccumulator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/MaxValueAccumulator.java
index 15c07ba154c..7b9f4b9e3c7 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/MaxValueAccumulator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/MaxValueAccumulator.java
@@ -223,6 +223,11 @@ public class MaxValueAccumulator implements Accumulator {
return maxResult.getDataType();
}
+ @Override
+ public int getPartialResultSize() {
+ return 1;
+ }
+
private void addIntInput(Column[] column, BitMap bitMap) {
int count = column[0].getPositionCount();
for (int i = 0; i < count; i++) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/MinTimeAccumulator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/MinTimeAccumulator.java
index a3d0ec9a3a4..8a99be7cba2 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/MinTimeAccumulator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/MinTimeAccumulator.java
@@ -119,6 +119,11 @@ public class MinTimeAccumulator implements Accumulator {
return TSDataType.INT64;
}
+ @Override
+ public int getPartialResultSize() {
+ return 1;
+ }
+
protected void updateMinTime(long curTime) {
hasCandidateResult = true;
minTime = Math.min(minTime, curTime);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/MinValueAccumulator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/MinValueAccumulator.java
index 14c18e1f45c..e768cffab39 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/MinValueAccumulator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/MinValueAccumulator.java
@@ -223,6 +223,11 @@ public class MinValueAccumulator implements Accumulator {
return minResult.getDataType();
}
+ @Override
+ public int getPartialResultSize() {
+ return 1;
+ }
+
private void addIntInput(Column[] column, BitMap bitMap) {
int count = column[0].getPositionCount();
for (int i = 0; i < count; i++) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/TimeDurationAccumulator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/TimeDurationAccumulator.java
index 095cece46ea..b0a8dd9f33d 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/TimeDurationAccumulator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/TimeDurationAccumulator.java
@@ -113,6 +113,11 @@ public class TimeDurationAccumulator implements
Accumulator {
return TSDataType.INT64;
}
+ @Override
+ public int getPartialResultSize() {
+ return 2;
+ }
+
protected void updateMaxTime(long curTime) {
initResult = true;
maxTime = Math.max(maxTime, curTime);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/UDAFAccumulator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/UDAFAccumulator.java
index e9e875205f8..fd4065d43ab 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/UDAFAccumulator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/UDAFAccumulator.java
@@ -195,6 +195,11 @@ public class UDAFAccumulator implements Accumulator {
return
UDFDataTypeTransformer.transformToTsDataType(configurations.getOutputDataType());
}
+ @Override
+ public int getPartialResultSize() {
+ return 1;
+ }
+
private void onError(String methodName, Exception e) {
LOGGER.warn(
"Error occurred during executing UDAF, please check whether the
implementation of UDF is correct according to the udf-api description.",
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/VarianceAccumulator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/VarianceAccumulator.java
index 1dfc265f6b1..f3604bcb5cc 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/VarianceAccumulator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/VarianceAccumulator.java
@@ -211,6 +211,11 @@ public class VarianceAccumulator implements Accumulator {
return TSDataType.DOUBLE;
}
+ @Override
+ public int getPartialResultSize() {
+ return 1;
+ }
+
private void addIntInput(Column[] columns, BitMap bitmap) {
int size = columns[0].getPositionCount();
for (int i = 0; i < size; i++) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/AggregationMergeSortOperator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/AggregationMergeSortOperator.java
index c280ab2eb36..b83e491e522 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/AggregationMergeSortOperator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/AggregationMergeSortOperator.java
@@ -19,8 +19,11 @@
package org.apache.iotdb.db.queryengine.execution.operator.process;
+import org.apache.iotdb.db.queryengine.execution.aggregation.Accumulator;
+import org.apache.iotdb.db.queryengine.execution.aggregation.Aggregator;
import org.apache.iotdb.db.queryengine.execution.operator.Operator;
import org.apache.iotdb.db.queryengine.execution.operator.OperatorContext;
+import
org.apache.iotdb.db.queryengine.execution.operator.process.join.merge.TimeComparator;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.read.common.TimeRange;
import org.apache.iotdb.tsfile.read.common.block.TsBlock;
@@ -28,15 +31,22 @@ import
org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder;
import org.apache.iotdb.tsfile.read.common.block.column.ColumnBuilder;
import org.apache.iotdb.tsfile.read.common.block.column.TimeColumnBuilder;
+import com.google.common.util.concurrent.ListenableFuture;
+import org.apache.iotdb.tsfile.utils.Binary;
+
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import java.util.concurrent.TimeUnit;
-public class AggregationMergeSortOperator extends AbstractConsumeAllOperator {
+import static com.google.common.util.concurrent.Futures.successfulAsList;
+import static
org.apache.iotdb.db.queryengine.execution.aggregation.AccumulatorFactory.createAccumulator;
- // private final ITimeRangeIterator timeRangeIterator;
+public class AggregationMergeSortOperator extends AbstractConsumeAllOperator {
- // Current interval of aggregation window [curStartTime, curEndTime)
- private TimeRange curTimeRange;
+ private List<Accumulator> accumulators;
private final List<TSDataType> dataTypes;
private final TsBlockBuilder tsBlockBuilder;
@@ -45,18 +55,37 @@ public class AggregationMergeSortOperator extends
AbstractConsumeAllOperator {
private boolean finished;
+ private Map<String, List<Aggregator>> aggMap;
+
+ private final TimeComparator timeComparator;
+
+ private final Comparator<Binary> deviceComparator;
+
private boolean currentFinished;
- private String currentDevice;
+ private Binary currentDevice;
private long currentTime;
+ private int[] readIndex;
+
+ List<Integer> newAggregationIdx;
+
public AggregationMergeSortOperator(
- OperatorContext operatorContext, List<Operator> children,
List<TSDataType> dataTypes) {
+ OperatorContext operatorContext,
+ List<Operator> children,
+ List<TSDataType> dataTypes,
+ List<Accumulator> accumulators,
+ TimeComparator timeComparator,
+ Comparator<Binary> deviceComparator) {
super(operatorContext, children);
this.dataTypes = dataTypes;
this.tsBlockBuilder = new TsBlockBuilder(dataTypes);
this.noMoreTsBlocks = new boolean[this.inputOperatorsCount];
+ this.accumulators = accumulators;
+ this.timeComparator = timeComparator;
+ this.deviceComparator = deviceComparator;
+ readIndex = new int[inputTsBlocks.length];
}
@Override
@@ -64,7 +93,6 @@ public class AggregationMergeSortOperator extends
AbstractConsumeAllOperator {
long startTime = System.nanoTime();
long maxRuntime =
operatorContext.getMaxRunTime().roundTo(TimeUnit.NANOSECONDS);
- // 1. fill consumed up TsBlock
if (!prepareInput()) {
return null;
}
@@ -72,9 +100,71 @@ public class AggregationMergeSortOperator extends
AbstractConsumeAllOperator {
tsBlockBuilder.reset();
TimeColumnBuilder timeBuilder = tsBlockBuilder.getTimeColumnBuilder();
ColumnBuilder[] valueColumnBuilders =
tsBlockBuilder.getValueColumnBuilders();
- for (TsBlock tsBlock : inputTsBlocks) {
- timeBuilder.writeLong(tsBlock.getTimeColumn().getLong(0));
-
valueColumnBuilders[0].writeBinary(tsBlock.getValueColumns()[0].getBinary(0));
+
+ while (true) {
+ currentDevice = null;
+
+ for (int idx = 0; idx < inputTsBlocks.length; idx++) {
+ TsBlock tsBlock = inputTsBlocks[idx];
+ if (!noMoreTsBlocks[idx] && tsBlock == null) {
+ return null;
+ }
+
+ if (readIndex[idx] >= tsBlock.getPositionCount()) {
+ inputTsBlocks[idx] = null;
+ }
+
+ Binary device = tsBlock.getColumn(0).getBinary(readIndex[idx]);
+
+ if (currentDevice == null) {
+ currentDevice = device;
+ } else {
+ if (deviceComparator.compare(device, currentDevice) < 0) {
+ currentDevice = device;
+ }
+ }
+ }
+
+ if (currentDevice == null) {
+ break;
+ }
+
+ for (int idx = 0; idx < inputTsBlocks.length; idx++) {
+ TsBlock tsBlock = inputTsBlocks[idx];
+ if (tsBlock == null) {
+ continue;
+ }
+
+ if (readIndex[idx] >= tsBlock.getPositionCount()) {
+ inputTsBlocks[idx] = null;
+ }
+
+ Binary device = tsBlock.getColumn(0).getBinary(readIndex[idx]);
+ if (device.equals(currentDevice)) {
+ currentTime = tsBlock.getTimeColumn().getLong(readIndex[idx]);
+ int cnt = 0;
+ for (int i = 0; i < accumulators.size(); i++) {
+ Accumulator accumulator = accumulators.get(i);
+ if (newAggregationIdx.get(i) == 2) {
+ accumulator.addIntermediate(tsBlock.getColumns(new int[2]{cnt++,
cnt+}));
+ } else {
+ accumulator.addIntermediate(tsBlock.getColumns(new int[]));
+ }
+ }
+ readIndex[idx] ++;
+ }
+ }
+
+ timeBuilder.writeLong(currentTime);
+ for (int i = 1; i < dataTypes.size(); i++) {
+ accumulators.get(i).outputFinal(valueColumnBuilders[i]);
+ }
+
+ currentDevice = null;
+
+ if (System.nanoTime() - startTime > maxRuntime ||
tsBlockBuilder.isFull()) {
+ break;
+ }
}
return tsBlockBuilder.build();
@@ -82,38 +172,58 @@ public class AggregationMergeSortOperator extends
AbstractConsumeAllOperator {
@Override
public boolean hasNext() throws Exception {
- // TODO the child of DeviceViewNode already calc TimeRange?
- // return curTimeRange != null || timeRangeIterator.hasNextTimeRange();
-
if (finished) {
return false;
}
+
for (int i = 0; i < inputOperatorsCount; i++) {
- if (!isEmpty(i)) {
+ if (isInputNotEmpty(i)) {
return true;
} else if (!noMoreTsBlocks[i]) {
if (!canCallNext[i] || children.get(i).hasNextWithTimer()) {
return true;
} else {
- children.get(i).close();
- children.set(i, null);
- noMoreTsBlocks[i] = true;
- inputTsBlocks[i] = null;
+ handleFinishedChild(i);
}
}
}
+
return false;
}
+ @Override
+ public ListenableFuture<?> isBlocked() {
+ boolean hasReadyChild = false;
+ List<ListenableFuture<?>> listenableFutures = new ArrayList<>();
+
+ for (int i = 0; i < inputOperatorsCount; i++) {
+ if (noMoreTsBlocks[i] || isInputNotEmpty(i) || children.get(i) == null) {
+ continue;
+ }
+ ListenableFuture<?> blocked = children.get(i).isBlocked();
+ if (blocked.isDone()) {
+ hasReadyChild = true;
+ // only when not blocked, canCallNext[i] equals true
+ canCallNext[i] = true;
+ } else {
+ listenableFutures.add(blocked);
+ }
+ }
+
+ return (hasReadyChild || listenableFutures.isEmpty())
+ ? NOT_BLOCKED
+ : successfulAsList(listenableFutures);
+ }
+
@Override
public boolean isFinished() throws Exception {
if (finished) {
return true;
}
- finished = true;
+ finished = true;
for (int i = 0; i < inputOperatorsCount; i++) {
- if (!noMoreTsBlocks[i] || !isEmpty(i)) {
+ if (!noMoreTsBlocks[i] || isInputNotEmpty(i)) {
finished = false;
break;
}
@@ -121,6 +231,25 @@ public class AggregationMergeSortOperator extends
AbstractConsumeAllOperator {
return finished;
}
+ @Override
+ public void close() throws Exception {
+ for (int i = 0; i < inputOperatorsCount; i++) {
+ final Operator operator = children.get(i);
+ if (operator != null) {
+ operator.close();
+ }
+ }
+ }
+
+ @Override
+ protected void handleFinishedChild(int currentChildIndex) throws Exception {
+ // invoking this method when children.get(currentChildIndex).hasNext
return false
+ noMoreTsBlocks[currentChildIndex] = true;
+ inputTsBlocks[currentChildIndex] = null;
+ children.get(currentChildIndex).close();
+ children.set(currentChildIndex, null);
+ }
+
@Override
public long calculateMaxPeekMemory() {
return 0;
@@ -135,4 +264,8 @@ public class AggregationMergeSortOperator extends
AbstractConsumeAllOperator {
public long calculateRetainedSizeAfterCallingNext() {
return 0;
}
+
+ private boolean isInputNotEmpty(int index) {
+ return inputTsBlocks[index] != null && !inputTsBlocks[index].isEmpty();
+ }
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/OperatorTreeGenerator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/OperatorTreeGenerator.java
index 61174b5e776..b052c007bc4 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/OperatorTreeGenerator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/OperatorTreeGenerator.java
@@ -27,6 +27,7 @@ import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.queryengine.common.FragmentInstanceId;
import org.apache.iotdb.db.queryengine.common.NodeRef;
+import org.apache.iotdb.db.queryengine.execution.aggregation.Accumulator;
import
org.apache.iotdb.db.queryengine.execution.aggregation.AccumulatorFactory;
import org.apache.iotdb.db.queryengine.execution.aggregation.Aggregator;
import
org.apache.iotdb.db.queryengine.execution.aggregation.slidingwindow.SlidingWindowAggregatorFactory;
@@ -147,6 +148,7 @@ import
org.apache.iotdb.db.queryengine.plan.analyze.cache.schema.DataNodeSchemaC
import org.apache.iotdb.db.queryengine.plan.expression.Expression;
import org.apache.iotdb.db.queryengine.plan.expression.leaf.TimeSeriesOperand;
import org.apache.iotdb.db.queryengine.plan.expression.leaf.TimestampOperand;
+import
org.apache.iotdb.db.queryengine.plan.expression.multi.FunctionExpression;
import
org.apache.iotdb.db.queryengine.plan.expression.visitor.ColumnTransformerVisitor;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanVisitor;
@@ -277,6 +279,7 @@ import static
org.apache.iotdb.db.queryengine.execution.operator.AggregationUtil
import static
org.apache.iotdb.db.queryengine.execution.operator.AggregationUtil.getOutputColumnSizePerLine;
import static
org.apache.iotdb.db.queryengine.execution.operator.AggregationUtil.initTimeRangeIterator;
import static
org.apache.iotdb.db.queryengine.plan.expression.leaf.TimestampOperand.TIMESTAMP_EXPRESSION_STRING;
+import static
org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.AggregationDescriptor.getAggregationTypeByFuncName;
import static
org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.SeriesScanOptions.updateFilterUsingTTL;
import static
org.apache.iotdb.db.utils.TimestampPrecisionUtils.TIMESTAMP_PRECISION;
@@ -872,24 +875,53 @@ public class OperatorTreeGenerator extends
PlanVisitor<Operator, LocalExecutionP
List<TSDataType> dataTypes = getOutputColumnTypes(node,
context.getTypeProvider());
List<Operator> children = dealWithConsumeAllChildrenPipelineBreaker(node,
context);
- // for (Expression expression : selectExpressions) {
- // if (expression instanceof FunctionExpression) {
- // FunctionExpression functionExpression = (FunctionExpression)
expression;
- // String functionName = functionExpression.getFunctionName();
- // expression.getExpressionType();
- // Accumulator accumulator = AccumulatorFactory.createAccumulator(
- // functionName,
- // aggregationType,
- //
- //
Collections.singletonList(context.getTypeProvider().getType(functionExpression.getOutputSymbol())),
- // null,
- // null,
- // true,
- // true);
- // }
- // }
-
- return new AggregationMergeSortOperator(operatorContext, children,
dataTypes);
+ List<SortItem> sortItemList =
node.getMergeOrderParameter().getSortItemList();
+ List<Integer> sortItemIndexList = new ArrayList<>(sortItemList.size());
+ List<TSDataType> sortItemDataTypeList = new
ArrayList<>(sortItemList.size());
+ genSortInformation(
+ node.getOutputColumnNames(),
+ dataTypes,
+ sortItemList,
+ sortItemIndexList,
+ sortItemDataTypeList);
+
+ boolean timeAscending = true;
+ TimeComparator timeComparator = ASC_TIME_COMPARATOR;
+ Comparator<Binary> deviceComparator = ASC_BINARY_COMPARATOR;
+ for (SortItem sortItem : sortItemList) {
+ if (TIMESTAMP_EXPRESSION_STRING.equalsIgnoreCase(sortItem.getSortKey()))
{
+ if (sortItem.getOrdering() == Ordering.DESC) {
+ timeAscending = false;
+ timeComparator = DESC_TIME_COMPARATOR;
+ }
+ } else if ("Device".equalsIgnoreCase(sortItem.getSortKey())) {
+ if (sortItem.getOrdering() == Ordering.DESC) {
+ deviceComparator = DESC_BINARY_COMPARATOR;
+ }
+ }
+ }
+
+ List<Accumulator> accumulators = new ArrayList<>();
+ for (Expression expression : node.getSelectExpressions()) {
+ if (expression instanceof FunctionExpression) {
+ FunctionExpression functionExpression = (FunctionExpression)
expression;
+ String aggregationName = functionExpression.getFunctionName();
+ Accumulator accumulator =
+ AccumulatorFactory.createAccumulator(
+ aggregationName,
+ getAggregationTypeByFuncName(aggregationName),
+ Collections.singletonList(
+
context.getTypeProvider().getType(functionExpression.getOutputSymbol())),
+ null,
+ null,
+ timeAscending,
+ false);
+ accumulators.add(accumulator);
+ }
+ }
+
+ return new AggregationMergeSortOperator(
+ operatorContext, children, dataTypes, accumulators, timeComparator,
deviceComparator);
}
@Override
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/SourceRewriter.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/SourceRewriter.java
index 57c807c8a41..022345dd250 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/SourceRewriter.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/SourceRewriter.java
@@ -30,7 +30,6 @@ import org.apache.iotdb.db.queryengine.common.MPPQueryContext;
import org.apache.iotdb.db.queryengine.plan.analyze.Analysis;
import org.apache.iotdb.db.queryengine.plan.expression.Expression;
import
org.apache.iotdb.db.queryengine.plan.expression.multi.FunctionExpression;
-import org.apache.iotdb.db.queryengine.plan.planner.LogicalPlanBuilder;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.BaseSourceRewriter;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId;
@@ -74,6 +73,7 @@ import
org.apache.iotdb.db.queryengine.plan.statement.component.OrderByKey;
import org.apache.iotdb.db.queryengine.plan.statement.component.Ordering;
import org.apache.iotdb.db.queryengine.plan.statement.component.SortItem;
import org.apache.iotdb.db.utils.constant.SqlConstant;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import java.util.ArrayList;
import java.util.Arrays;
@@ -90,6 +90,7 @@ import java.util.stream.Collectors;
import static org.apache.iotdb.commons.conf.IoTDBConstant.LAST_VALUE;
import static
org.apache.iotdb.commons.conf.IoTDBConstant.MULTI_LEVEL_PATH_WILDCARD;
import static org.apache.iotdb.commons.partition.DataPartition.NOT_ASSIGNED;
+import static
org.apache.iotdb.db.queryengine.plan.analyze.ExpressionTypeAnalyzer.analyzeExpression;
import static
org.apache.iotdb.db.queryengine.plan.planner.LogicalPlanBuilder.updateTypeProviderByPartialAggregation;
import static org.apache.iotdb.db.utils.constant.SqlConstant.AVG;
import static org.apache.iotdb.db.utils.constant.SqlConstant.FIRST_VALUE;
@@ -228,55 +229,66 @@ public class SourceRewriter extends
BaseSourceRewriter<DistributionPlanContext>
if (analysis.isExistDeviceCrossRegion() &&
analysis.isDeviceViewSpecialProcess()) {
// return processSpecialDeviceView(node, context);
- // TODO 1. generate old and new measurement idx relationship 2. generate
new outputColumns for
+ // TODO 1. generate old and new measurement idx relationship
+ // TODO 2. generate new outputColumns for
// each subDeviceView
-// Map<Integer, List<Integer>> newMeasurementIdxMap = new HashMap<>();
-// List<String> newPartialOutputColumns = new ArrayList<>();
-//
+ Map<Integer, List<Integer>> newMeasurementIdxMap = new HashMap<>();
+ List<String> newPartialOutputColumns = new ArrayList<>();
+
Set<Expression> selectExpressions = analysis.getSelectExpressions();
-//
-// int i = 0, idxSum = 0;
-// for (Expression expression : selectExpressions) {
-// if (i == 0) {
-// // device
-// newPartialOutputColumns.add(expression.getOutputSymbol());
-// i++;
-// idxSum++;
-// continue;
-// }
-// FunctionExpression aggExpression = (FunctionExpression) expression;
-// List<String> actualPartialAggregationNames =
-//
getActualPartialAggregationNames(aggExpression.getFunctionName());
-// for (String actualAggName : actualPartialAggregationNames) {
-// newPartialOutputColumns.add(
-// new FunctionExpression(
-// actualAggName,
-// aggExpression.getFunctionAttributes(),
-// aggExpression.getExpressions())
-// .getOutputSymbol());
-// }
-// // TODO need update typeProvider?
-// if (actualPartialAggregationNames.size() > 1) {
-// newMeasurementIdxMap.put(i, Arrays.asList(idxSum++, idxSum++));
-// } else {
-// newMeasurementIdxMap.put(i, Collections.singletonList(idxSum++));
-// }
-// i++;
-// }
-
-// for (String device : node.getDevices()) {
-// List<Integer> oldMeasurementList =
-// node.getDeviceToMeasurementIndexesMap().get(device);
-// List<Integer> newMeasurementList = new ArrayList<>();
-// for (int idx : oldMeasurementList) {
-// newMeasurementList.addAll(newMeasurementIdxMap.get(idx));
-// }
-// node.getDeviceToMeasurementIndexesMap().put(device,
newMeasurementList);
-// }
+ int[] newAggregationIdx = new int[selectExpressions.size()];
+
+ int i = 0, idxSum = 0;
+ for (Expression expression : selectExpressions) {
+ if (i == 0) {
+ // device
+ newPartialOutputColumns.add(expression.getOutputSymbol());
+ i++;
+ idxSum++;
+ continue;
+ }
+ FunctionExpression aggExpression = (FunctionExpression) expression;
+ // used for AVG, FIRST_VALUE, LAST_VALUE, TIME_DURATION agg function
+ List<String> actualPartialAggregationNames =
+ getActualPartialAggregationNames(aggExpression.getFunctionName());
+ for (String actualAggName : actualPartialAggregationNames) {
+ FunctionExpression partialFunctionExpression =
+ new FunctionExpression(
+ actualAggName,
+ aggExpression.getFunctionAttributes(),
+ aggExpression.getExpressions());
+ if (actualPartialAggregationNames.size() > 1) {
+ TSDataType dataType = analyzeExpression(analysis,
partialFunctionExpression);
+ context
+ .queryContext
+ .getTypeProvider()
+ .setType(partialFunctionExpression.getOutputSymbol(),
dataType);
+ }
+
newPartialOutputColumns.add(partialFunctionExpression.getOutputSymbol());
+ }
+
+ newAggregationIdx[i] = actualPartialAggregationNames.size();
+ // TODO need update typeProvider?
+ if (actualPartialAggregationNames.size() > 1) {
+ newMeasurementIdxMap.put(i, Arrays.asList(idxSum++, idxSum++));
+ } else {
+ newMeasurementIdxMap.put(i, Collections.singletonList(idxSum++));
+ }
+ i++;
+ }
+
+ for (String device : node.getDevices()) {
+ List<Integer> oldMeasurementList =
node.getDeviceToMeasurementIndexesMap().get(device);
+ List<Integer> newMeasurementList = new ArrayList<>();
+ for (int idx : oldMeasurementList) {
+ newMeasurementList.addAll(newMeasurementIdxMap.get(idx));
+ }
+ node.getDeviceToMeasurementIndexesMap().put(device,
newMeasurementList);
+ }
for (PlanNode planNode : deviceViewNodeList) {
DeviceViewNode deviceViewNode = (DeviceViewNode) planNode;
- // deviceViewNode.setOutputColumnNames(newPartialOutputColumns);
+ deviceViewNode.setOutputColumnNames(newPartialOutputColumns);
transferAggregatorsRecursively2(planNode, context);
}
@@ -285,7 +297,8 @@ public class SourceRewriter extends
BaseSourceRewriter<DistributionPlanContext>
context.queryContext.getQueryId().genPlanNodeId(),
node.getMergeOrderParameter(),
node.getOutputColumnNames(),
- selectExpressions);
+ selectExpressions,
+ newAggregationIdx);
deviceViewNodeList.forEach(mergeSortNode::addChild);
return Collections.singletonList(mergeSortNode);
} else {
@@ -410,16 +423,15 @@ public class SourceRewriter extends
BaseSourceRewriter<DistributionPlanContext>
List<String> aggregationNames =
descriptor.getActualAggregationNames(true);
for (String aggregationName : aggregationNames) {
newDescriptorList.add(
- new AggregationDescriptor(
- aggregationName,
- AggregationStep.PARTIAL,
- descriptor.getInputExpressions(),
- descriptor.getInputAttributes()));
+ new AggregationDescriptor(
+ aggregationName,
+ AggregationStep.PARTIAL,
+ descriptor.getInputExpressions(),
+ descriptor.getInputAttributes()));
}
}
scanSourceNode.setAggregationDescriptorList(newDescriptorList);
}
-
}
}
@@ -759,9 +771,7 @@ public class SourceRewriter extends
BaseSourceRewriter<DistributionPlanContext>
descriptor.getInputExpressions(),
descriptor.getInputAttributes())));
leafAggDescriptorList.forEach(
- d ->
- updateTypeProviderByPartialAggregation(
- d, context.queryContext.getTypeProvider()));
+ d -> updateTypeProviderByPartialAggregation(d,
context.queryContext.getTypeProvider()));
List<AggregationDescriptor> rootAggDescriptorList = new ArrayList<>();
node.getAggregationDescriptorList()
.forEach(
@@ -1490,8 +1500,7 @@ public class SourceRewriter extends
BaseSourceRewriter<DistributionPlanContext>
descriptor.setStep(level == 0 ? AggregationStep.FINAL :
AggregationStep.INTERMEDIATE);
descriptor.setInputExpressions(new ArrayList<>(descriptorExpressions));
descriptorList.add(descriptor);
- updateTypeProviderByPartialAggregation(
- descriptor, context.queryContext.getTypeProvider());
+ updateTypeProviderByPartialAggregation(descriptor,
context.queryContext.getTypeProvider());
}
handle.setGroupByLevelDescriptors(descriptorList);
}
@@ -1588,8 +1597,7 @@ public class SourceRewriter extends
BaseSourceRewriter<DistributionPlanContext>
.forEach(
d -> {
d.setStep(AggregationStep.PARTIAL);
- updateTypeProviderByPartialAggregation(
- d, context.queryContext.getTypeProvider());
+ updateTypeProviderByPartialAggregation(d,
context.queryContext.getTypeProvider());
});
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/process/AggregationMergeSortNode.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/process/AggregationMergeSortNode.java
index c446715efae..dd7dc6cd404 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/process/AggregationMergeSortNode.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/process/AggregationMergeSortNode.java
@@ -43,15 +43,19 @@ public class AggregationMergeSortNode extends
MultiChildProcessNode {
private final Set<Expression> selectExpressions;
+ private final int[] newAggregationIdx;
+
public AggregationMergeSortNode(
PlanNodeId id,
OrderByParameter mergeOrderParameter,
List<String> outputColumns,
- Set<Expression> selectExpressions) {
+ Set<Expression> selectExpressions,
+ int[] newAggregationIdx) {
super(id);
this.mergeOrderParameter = mergeOrderParameter;
this.outputColumns = outputColumns;
this.selectExpressions = selectExpressions;
+ this.newAggregationIdx = newAggregationIdx;
}
public AggregationMergeSortNode(
@@ -59,21 +63,31 @@ public class AggregationMergeSortNode extends
MultiChildProcessNode {
List<PlanNode> children,
OrderByParameter mergeOrderParameter,
List<String> outputColumns,
- Set<Expression> selectExpressions) {
+ Set<Expression> selectExpressions,
+ int[] newAggregationIdx) {
super(id, children);
this.mergeOrderParameter = mergeOrderParameter;
this.outputColumns = outputColumns;
this.selectExpressions = selectExpressions;
+ this.newAggregationIdx = newAggregationIdx;
}
public OrderByParameter getMergeOrderParameter() {
return mergeOrderParameter;
}
+ public Set<Expression> getSelectExpressions() {
+ return this.selectExpressions;
+ }
+
@Override
public PlanNode clone() {
return new AggregationMergeSortNode(
- getPlanNodeId(), getMergeOrderParameter(), outputColumns,
selectExpressions);
+ getPlanNodeId(),
+ getMergeOrderParameter(),
+ outputColumns,
+ selectExpressions,
+ newAggregationIdx);
}
@Override
@@ -83,7 +97,8 @@ public class AggregationMergeSortNode extends
MultiChildProcessNode {
new ArrayList<>(children.subList(startIndex, endIndex)),
getMergeOrderParameter(),
outputColumns,
- selectExpressions);
+ selectExpressions,
+ null);
}
@Override
@@ -125,7 +140,7 @@ public class AggregationMergeSortNode extends
MultiChildProcessNode {
columnSize--;
}
PlanNodeId planNodeId = PlanNodeId.deserialize(byteBuffer);
- return new AggregationMergeSortNode(planNodeId, orderByParameter,
outputColumns, null);
+ return new AggregationMergeSortNode(planNodeId, orderByParameter,
outputColumns, null, null);
}
@Override
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/parameter/AggregationDescriptor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/parameter/AggregationDescriptor.java
index ff6abfe2017..c695c2280b7 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/parameter/AggregationDescriptor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/parameter/AggregationDescriptor.java
@@ -81,7 +81,7 @@ public class AggregationDescriptor {
this.inputAttributes = inputAttributes;
}
- private TAggregationType getAggregationTypeByFuncName(String funcName) {
+ public static TAggregationType getAggregationTypeByFuncName(String funcName)
{
if (isBuiltinAggregationName(funcName.toLowerCase())) {
return TAggregationType.valueOf(funcName.toUpperCase());
} else {