This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 0356975bad1 Support aggregation push-down optimize for table view of
non aligned series (#17294)
0356975bad1 is described below
commit 0356975bad1cd5b1ed7cf52b1097428ca7eeecd9
Author: Weihao Li <[email protected]>
AuthorDate: Fri Mar 20 09:29:03 2026 +0800
Support aggregation push-down optimize for table view of non aligned series
(#17294)
---
.../query/view/recent/IoTDBTableViewQueryIT.java | 52 ++-
.../relational/AbstractAggTableScanOperator.java | 23 +-
.../relational/DeviceIteratorScanOperator.java | 15 +-
.../relational/LastQueryAggTableScanOperator.java | 2 +-
...onAlignedDeviceViewAggregationScanOperator.java | 259 ++++++++++++++
.../plan/planner/TableOperatorGenerator.java | 106 +++++-
.../plan/planner/plan/node/PlanNodeType.java | 13 +-
.../plan/planner/plan/node/PlanVisitor.java | 12 +
.../planner/plan/parameter/SeriesScanOptions.java | 13 +-
.../distribute/TableDistributedPlanGenerator.java | 394 +++++++++++++++------
.../node/AggregationTreeDeviceViewScanNode.java | 15 +-
... AlignedAggregationTreeDeviceViewScanNode.java} | 113 ++----
...nAlignedAggregationTreeDeviceViewScanNode.java} | 113 ++----
.../PushAggregationIntoTableScan.java | 4 -
.../operator/DeviceIteratorScanOperatorTest.java | 8 +-
.../plan/relational/analyzer/TreeViewTest.java | 82 ++++-
.../planner/assertions/PlanMatchPattern.java | 34 ++
17 files changed, 928 insertions(+), 330 deletions(-)
diff --git
a/integration-test/src/test/java/org/apache/iotdb/relational/it/query/view/recent/IoTDBTableViewQueryIT.java
b/integration-test/src/test/java/org/apache/iotdb/relational/it/query/view/recent/IoTDBTableViewQueryIT.java
index 960dfa8ac22..3dc7386a71a 100644
---
a/integration-test/src/test/java/org/apache/iotdb/relational/it/query/view/recent/IoTDBTableViewQueryIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/relational/it/query/view/recent/IoTDBTableViewQueryIT.java
@@ -349,8 +349,8 @@ public class IoTDBTableViewQueryIT {
false);
compareQueryResults(
session,
- "select * from (select time, battery as device1 from view1 where
battery = 'b1') as t1 full outer join (select time, battery as device2 from
view2 where battery = 'b') as t2 using(time)",
- "select * from (select time, battery as device1 from table1 where
battery = 'b1') as t1 full outer join (select time, battery as device2 from
table1 where battery = 'b') as t2 using(time)",
+ "select * from (select time, battery as battery1 from view1 where
battery = 'b1') as t1 full outer join (select time, battery as battery2 from
view2 where battery = 'b') as t2 using(time)",
+ "select * from (select time, battery as battery1 from table1 where
battery = 'b1') as t1 full outer join (select time, battery as battery2 from
table1 where battery = 'b') as t2 using(time)",
true);
compareQueryResults(
session,
@@ -380,6 +380,54 @@ public class IoTDBTableViewQueryIT {
"select count(distinct battery) from view4 where battery = 'b1'",
"select count(distinct battery) from table1 where battery = 'b1'",
true);
+
+ compareQueryResults(
+ session,
+ "select count(time) from view1 where time > 604800000",
+ "select count(time) from table1 where time > 604800000",
+ true);
+
+ compareQueryResults(
+ session,
+ "select count(battery),count(time) from view1 where time >
604800000",
+ "select count(battery),count(time) from table1 where time >
604800000",
+ true);
+
+ compareQueryResults(
+ session,
+ "select count(battery),count(time),count(current) from view1 where
time > 604800000",
+ "select count(battery),count(time),count(current) from table1 where
time > 604800000",
+ true);
+
+ compareQueryResults(
+ session,
+ "select distinct battery from view1 where time > 604800000",
+ "select distinct battery from table1 where time > 604800000",
+ true);
+
+ compareQueryResults(
+ session,
+ "select count(current) from view1 where time > 604800000 group by
battery",
+ "select count(current) from table1 where time > 604800000 group by
battery",
+ true);
+
+ compareQueryResults(
+ session,
+ "select count(current) from view1 group by date_bin(2ms,time)",
+ "select count(current) from table1 group by date_bin(2ms,time)",
+ true);
+
+ compareQueryResults(
+ session,
+ "select count(current) from view1 group by
battery,date_bin(2ms,time)",
+ "select count(current) from table1 group by
battery,date_bin(2ms,time)",
+ true);
+
+ compareQueryResults(
+ session,
+ "select last(columns(*)) from view1 group by
battery,date_bin(2ms,time)",
+ "select last(columns(*)) from table1 group by
battery,date_bin(2ms,time)",
+ true);
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/AbstractAggTableScanOperator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/AbstractAggTableScanOperator.java
index a59ec643b07..9d3979bafec 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/AbstractAggTableScanOperator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/AbstractAggTableScanOperator.java
@@ -34,6 +34,7 @@ import
org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.SeriesScanOpt
import
org.apache.iotdb.db.queryengine.plan.relational.metadata.AlignedDeviceEntry;
import org.apache.iotdb.db.queryengine.plan.relational.metadata.ColumnSchema;
import org.apache.iotdb.db.queryengine.plan.relational.metadata.DeviceEntry;
+import org.apache.iotdb.db.queryengine.plan.relational.planner.Symbol;
import org.apache.iotdb.db.queryengine.plan.statement.component.Ordering;
import org.apache.iotdb.db.storageengine.dataregion.read.IQueryDataSource;
import org.apache.iotdb.db.storageengine.dataregion.read.QueryDataSource;
@@ -71,7 +72,7 @@ import static
org.apache.tsfile.read.common.block.TsBlockUtil.skipPointsOutOfTim
public abstract class AbstractAggTableScanOperator extends
AbstractDataSourceOperator {
private boolean finished = false;
- private TsBlock inputTsBlock;
+ protected TsBlock inputTsBlock;
protected List<TableAggregator> tableAggregators;
protected final List<ColumnSchema> groupingKeySchemas;
@@ -104,11 +105,11 @@ public abstract class AbstractAggTableScanOperator
extends AbstractDataSourceOpe
// e.g. for aggregation `last(s1), count(s2), count(s1)`, the inputChannels
should be [0, 1, 0]
protected List<Integer> aggregatorInputChannels;
- private QueryDataSource queryDataSource;
+ protected QueryDataSource queryDataSource;
protected ITableTimeRangeIterator timeIterator;
- private boolean allAggregatorsHasFinalResult = false;
+ protected boolean allAggregatorsHasFinalResult = false;
protected AbstractAggTableScanOperator(AbstractAggTableScanOperatorParameter
parameter) {
@@ -193,7 +194,7 @@ public abstract class AbstractAggTableScanOperator extends
AbstractDataSourceOpe
}
/** Return true if we have the result of this timeRange. */
- protected Optional<Boolean> calculateAggregationResultForCurrentTimeRange() {
+ protected Optional<Boolean> calculateAggregationResultForCurrentTimeRange()
throws Exception {
try {
if (calcFromCachedData()) {
updateResultTsBlock();
@@ -706,7 +707,7 @@ public abstract class AbstractAggTableScanOperator extends
AbstractDataSourceOpe
return true;
}
- private void checkIfAllAggregatorHasFinalResult() {
+ protected void checkIfAllAggregatorHasFinalResult() throws Exception {
if (allAggregatorsHasFinalResult
&& (timeIterator.getType() ==
ITableTimeRangeIterator.TimeIteratorType.SINGLE_TIME_ITERATOR
|| tableAggregators.isEmpty())) {
@@ -729,7 +730,7 @@ public abstract class AbstractAggTableScanOperator extends
AbstractDataSourceOpe
}
}
- private void nextDevice() {
+ protected void nextDevice() throws Exception {
currentDeviceIndex++;
this.operatorContext.recordSpecifiedInfo(
CURRENT_DEVICE_INDEX_STRING, Integer.toString(currentDeviceIndex));
@@ -812,6 +813,8 @@ public abstract class AbstractAggTableScanOperator extends
AbstractDataSourceOpe
protected List<DeviceEntry> deviceEntries;
protected int deviceCount;
+ private List<Symbol> outputSymbols;
+
public AbstractAggTableScanOperatorParameter(
PlanNodeId sourceId,
OperatorContext context,
@@ -830,7 +833,8 @@ public abstract class AbstractAggTableScanOperator extends
AbstractDataSourceOpe
boolean ascending,
boolean canUseStatistics,
List<Integer> aggregatorInputChannels,
- String timeColumnName) {
+ String timeColumnName,
+ List<Symbol> outputSymbols) {
this.sourceId = sourceId;
this.context = context;
this.aggColumnSchemas = aggColumnSchemas;
@@ -849,6 +853,11 @@ public abstract class AbstractAggTableScanOperator extends
AbstractDataSourceOpe
this.canUseStatistics = canUseStatistics;
this.aggregatorInputChannels = aggregatorInputChannels;
this.timeColumnName = timeColumnName;
+ this.outputSymbols = outputSymbols;
+ }
+
+ public List<Symbol> getOutputSymbols() {
+ return outputSymbols;
}
public OperatorContext getOperatorContext() {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/DeviceIteratorScanOperator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/DeviceIteratorScanOperator.java
index f088e448fd7..14044158045 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/DeviceIteratorScanOperator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/DeviceIteratorScanOperator.java
@@ -30,6 +30,7 @@ import
org.apache.iotdb.db.storageengine.dataregion.read.QueryDataSource;
import com.google.common.util.concurrent.ListenableFuture;
import org.apache.tsfile.enums.TSDataType;
import org.apache.tsfile.read.common.block.TsBlock;
+import org.apache.tsfile.utils.Accountable;
import org.apache.tsfile.utils.RamUsageEstimator;
import org.apache.tsfile.write.schema.IMeasurementSchema;
@@ -113,7 +114,7 @@ public class DeviceIteratorScanOperator extends
AbstractDataSourceOperator {
}
DeviceEntry deviceEntry = this.deviceEntries.get(this.currentDeviceIndex);
-
deviceChildOperatorTreeGenerator.generateCurrentDeviceOperatorTree(deviceEntry);
+
deviceChildOperatorTreeGenerator.generateCurrentDeviceOperatorTree(deviceEntry,
true);
currentDeviceRootOperator =
deviceChildOperatorTreeGenerator.getCurrentDeviceRootOperator();
dataSourceOperators =
deviceChildOperatorTreeGenerator.getCurrentDeviceDataSourceOperators();
currentDeviceInit = false;
@@ -185,7 +186,13 @@ public class DeviceIteratorScanOperator extends
AbstractDataSourceOperator {
return INSTANCE_SIZE
+
MemoryEstimationHelper.getEstimatedSizeOfAccountableObject(operatorContext)
+
MemoryEstimationHelper.getEstimatedSizeOfAccountableObject(currentDeviceRootOperator)
- + RamUsageEstimator.sizeOfCollection(deviceEntries);
+ + RamUsageEstimator.sizeOfCollection(deviceEntries)
+ + MemoryEstimationHelper.getEstimatedSizeOfAccountableObject(
+ deviceChildOperatorTreeGenerator);
+ }
+
+ public DeviceChildOperatorTreeGenerator
getDeviceChildOperatorTreeGenerator() {
+ return deviceChildOperatorTreeGenerator;
}
public static class TreeNonAlignedDeviceViewScanParameters {
@@ -212,12 +219,12 @@ public class DeviceIteratorScanOperator extends
AbstractDataSourceOperator {
}
}
- public interface DeviceChildOperatorTreeGenerator {
+ public interface DeviceChildOperatorTreeGenerator extends Accountable {
// Do the offset and limit operator need to keep after the device iterator
boolean keepOffsetAndLimitOperatorAfterDeviceIterator();
// Generate the following operator subtree based on the current deviceEntry
- void generateCurrentDeviceOperatorTree(DeviceEntry deviceEntry);
+ void generateCurrentDeviceOperatorTree(DeviceEntry deviceEntry, boolean
needAdaptor);
// Returns the root operator of the subtree
Operator getCurrentDeviceRootOperator();
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/LastQueryAggTableScanOperator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/LastQueryAggTableScanOperator.java
index 9b4c37995de..2beccf46d8e 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/LastQueryAggTableScanOperator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/LastQueryAggTableScanOperator.java
@@ -156,7 +156,7 @@ public class LastQueryAggTableScanOperator extends
AbstractAggTableScanOperator
}
/** Main process logic, calc the last aggregation results of current device.
*/
- private void processCurrentDevice() {
+ private void processCurrentDevice() throws Exception {
if (currentHitCacheIndex < hitCachesIndexes.size()
&& outputDeviceIndex == hitCachesIndexes.get(currentHitCacheIndex)) {
currentDeviceEntry = cachedDeviceEntries.get(currentHitCacheIndex);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/TreeNonAlignedDeviceViewAggregationScanOperator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/TreeNonAlignedDeviceViewAggregationScanOperator.java
new file mode 100644
index 00000000000..4258f335a12
--- /dev/null
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/TreeNonAlignedDeviceViewAggregationScanOperator.java
@@ -0,0 +1,259 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.execution.operator.source.relational;
+
+import org.apache.iotdb.db.queryengine.execution.MemoryEstimationHelper;
+import
org.apache.iotdb.db.queryengine.execution.aggregation.timerangeiterator.ITableTimeRangeIterator;
+import org.apache.iotdb.db.queryengine.execution.operator.Operator;
+import
org.apache.iotdb.db.queryengine.execution.operator.source.AbstractDataSourceOperator;
+import org.apache.iotdb.db.queryengine.plan.relational.metadata.DeviceEntry;
+import org.apache.iotdb.db.storageengine.dataregion.read.IQueryDataSource;
+import org.apache.iotdb.db.storageengine.dataregion.read.QueryDataSource;
+
+import com.google.common.util.concurrent.ListenableFuture;
+import org.apache.tsfile.block.column.Column;
+import org.apache.tsfile.file.metadata.IDeviceID;
+import org.apache.tsfile.read.common.block.TsBlock;
+import org.apache.tsfile.read.common.block.TsBlockBuilder;
+import org.apache.tsfile.read.common.block.column.RunLengthEncodedColumn;
+import org.apache.tsfile.utils.RamUsageEstimator;
+
+import java.util.List;
+import java.util.Optional;
+
+import static
org.apache.iotdb.db.queryengine.execution.operator.source.relational.AbstractTableScanOperator.CURRENT_DEVICE_INDEX_STRING;
+import static
org.apache.iotdb.db.queryengine.execution.operator.source.relational.AbstractTableScanOperator.TIME_COLUMN_TEMPLATE;
+
+public class TreeNonAlignedDeviceViewAggregationScanOperator
+ extends AbstractDefaultAggTableScanOperator {
+ private static final long INSTANCE_SIZE =
+ RamUsageEstimator.shallowSizeOfInstance(
+ TreeNonAlignedDeviceViewAggregationScanOperator.class);
+
+ private final IDeviceID.TreeDeviceIdColumnValueExtractor extractor;
+ private final DeviceIteratorScanOperator.DeviceChildOperatorTreeGenerator
childOperatorGenerator;
+
+ private Operator child;
+ private List<Operator> dataSourceOperators;
+
+ public TreeNonAlignedDeviceViewAggregationScanOperator(
+ AbstractAggTableScanOperatorParameter parameter,
+ IDeviceID.TreeDeviceIdColumnValueExtractor extractor,
+ DeviceIteratorScanOperator.DeviceChildOperatorTreeGenerator
childOperatorGenerator) {
+ super(parameter);
+ this.extractor = extractor;
+ this.childOperatorGenerator = childOperatorGenerator;
+ constructCurrentDeviceOperatorTree();
+ }
+
+ @Override
+ public ListenableFuture<?> isBlocked() {
+ // Don't call isBlocked of child if the device has been consumed up,
because the child operator
+ // may have been closed
+ return currentDeviceIndex >= deviceCount ? NOT_BLOCKED : child.isBlocked();
+ }
+
+ @Override
+ public TsBlock next() throws Exception {
+ if (retainedTsBlock != null) {
+ return getResultFromRetainedTsBlock();
+ }
+
+ // optimize for sql: select count(*) from (select count(s1), sum(s1) from
table)
+ if (tableAggregators.isEmpty()
+ && timeIterator.getType() ==
ITableTimeRangeIterator.TimeIteratorType.SINGLE_TIME_ITERATOR
+ && resultTsBlockBuilder.getValueColumnBuilders().length == 0) {
+ resultTsBlockBuilder.reset();
+ currentDeviceIndex = deviceCount;
+ timeIterator.setFinished();
+ Column[] valueColumns = new Column[0];
+ return new TsBlock(1, new RunLengthEncodedColumn(TIME_COLUMN_TEMPLATE,
1), valueColumns);
+ }
+
+ // calculate aggregation result on current time window
+ // return true if current time window is calc finished
+ Optional<Boolean> b = calculateAggregationResultForCurrentTimeRange();
+ if (b.isPresent() && b.get()) {
+ timeIterator.resetCurTimeRange();
+ buildResultTsBlock();
+ }
+
+ if (resultTsBlockBuilder.isFull()) {
+ buildResultTsBlock();
+ }
+
+ if (resultTsBlock == null) {
+ return null;
+ }
+ return checkTsBlockSizeAndGetResult();
+ }
+
+ @Override
+ public void close() throws Exception {
+ if (child != null) {
+ child.close();
+ }
+ }
+
+ @Override
+ String getNthIdColumnValue(DeviceEntry deviceEntry, int idColumnIndex) {
+ return (String) extractor.extract(deviceEntry.getDeviceID(),
idColumnIndex);
+ }
+
+ @Override
+ protected Optional<Boolean> calculateAggregationResultForCurrentTimeRange()
throws Exception {
+ // Try to calculate from cached data
+ if (calcFromCachedData()) {
+ updateResultTsBlock();
+ checkIfAllAggregatorHasFinalResult();
+ return Optional.of(true);
+ }
+
+ // Read from child operator
+ if (readAndCalcFromChild()) {
+ updateResultTsBlock();
+ checkIfAllAggregatorHasFinalResult();
+ return Optional.of(true);
+ }
+
+ // No more data from child, finish the current device
+ if (!child.hasNext()) {
+ updateResultTsBlock();
+ timeIterator.resetCurTimeRange();
+ nextDevice();
+
+ if (currentDeviceIndex >= deviceCount) {
+ // All devices consumed
+ timeIterator.setFinished();
+ return Optional.of(true);
+ } else {
+ // More devices to process, child should provide next device's data
+ return Optional.of(false);
+ }
+ }
+
+ return Optional.of(false);
+ }
+
+ /** Read data from child operator and calculate aggregation. */
+ private boolean readAndCalcFromChild() throws Exception {
+ if (child.hasNext()) {
+ // Get next TsBlock from child
+ TsBlock tsBlock = child.nextWithTimer();
+ if (tsBlock == null || tsBlock.isEmpty()) {
+ return false;
+ }
+ // Calculate aggregation from raw data
+ return calcUsingRawData(tsBlock);
+ }
+ return false;
+ }
+
+ @Override
+ protected void nextDevice() throws Exception {
+ currentDeviceIndex++;
+ childOperatorGenerator.getCurrentDeviceStartCloseOperator().close();
+ if (currentDeviceIndex >= deviceEntries.size()) {
+ return;
+ }
+ constructCurrentDeviceOperatorTree();
+ queryDataSource.reset();
+ initQueryDataSource(queryDataSource);
+ this.operatorContext.recordSpecifiedInfo(
+ CURRENT_DEVICE_INDEX_STRING, Integer.toString(currentDeviceIndex));
+ }
+
+ private void constructCurrentDeviceOperatorTree() {
+ if (this.deviceEntries.isEmpty()) {
+ return;
+ }
+ if (this.deviceEntries.get(this.currentDeviceIndex) == null) {
+ throw new IllegalStateException(
+ "Device entries of index " + this.currentDeviceIndex + " is empty");
+ }
+ DeviceEntry deviceEntry = this.deviceEntries.get(this.currentDeviceIndex);
+
+ childOperatorGenerator.generateCurrentDeviceOperatorTree(deviceEntry,
false);
+ child = childOperatorGenerator.getCurrentDeviceRootOperator();
+ dataSourceOperators =
childOperatorGenerator.getCurrentDeviceDataSourceOperators();
+ }
+
+ /** same with {@link
DeviceIteratorScanOperator#initQueryDataSource(IQueryDataSource)} */
+ @Override
+ public void initQueryDataSource(IQueryDataSource dataSource) {
+ if (resultTsBlockBuilder == null) {
+ // only need to do this when init firstly
+ this.queryDataSource = (QueryDataSource) dataSource;
+ this.resultTsBlockBuilder = new TsBlockBuilder(getResultDataTypes());
+ }
+
+ if (dataSourceOperators == null || dataSourceOperators.isEmpty()) {
+ return;
+ }
+
+ for (Operator operator : dataSourceOperators) {
+ ((AbstractDataSourceOperator) operator).initQueryDataSource(dataSource);
+ }
+ }
+
+ @Override
+ protected void checkIfAllAggregatorHasFinalResult() throws Exception {
+ if (allAggregatorsHasFinalResult
+ && (timeIterator.getType() ==
ITableTimeRangeIterator.TimeIteratorType.SINGLE_TIME_ITERATOR
+ || tableAggregators.isEmpty())) {
+ nextDevice();
+ inputTsBlock = null;
+
+ if (currentDeviceIndex >= deviceCount) {
+ // all devices have been consumed
+ timeIterator.setFinished();
+ }
+
+ allAggregatorsHasFinalResult = false;
+ }
+ }
+
+ @Override
+ public long calculateMaxPeekMemory() {
+ return child.calculateMaxPeekMemory();
+ }
+
+ @Override
+ public long calculateMaxReturnSize() {
+ return child.calculateMaxReturnSize();
+ }
+
+ @Override
+ public long calculateRetainedSizeAfterCallingNext() {
+ return child.calculateRetainedSizeAfterCallingNext();
+ }
+
+ @Override
+ public long ramBytesUsed() {
+ return INSTANCE_SIZE
+ +
MemoryEstimationHelper.getEstimatedSizeOfAccountableObject(seriesScanUtil)
+ +
MemoryEstimationHelper.getEstimatedSizeOfAccountableObject(operatorContext)
+ + MemoryEstimationHelper.getEstimatedSizeOfAccountableObject(sourceId)
+ + (resultTsBlockBuilder == null ? 0 :
resultTsBlockBuilder.getRetainedSizeInBytes())
+ + RamUsageEstimator.sizeOfCollection(deviceEntries)
+ + MemoryEstimationHelper.getEstimatedSizeOfAccountableObject(child)
+ +
MemoryEstimationHelper.getEstimatedSizeOfAccountableObject(childOperatorGenerator);
+ }
+}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java
index 63705c78910..fa2be3746ae 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java
@@ -135,6 +135,7 @@ import
org.apache.iotdb.db.queryengine.execution.operator.source.relational.Merg
import
org.apache.iotdb.db.queryengine.execution.operator.source.relational.TableScanOperator;
import
org.apache.iotdb.db.queryengine.execution.operator.source.relational.TreeAlignedDeviceViewAggregationScanOperator;
import
org.apache.iotdb.db.queryengine.execution.operator.source.relational.TreeAlignedDeviceViewScanOperator;
+import
org.apache.iotdb.db.queryengine.execution.operator.source.relational.TreeNonAlignedDeviceViewAggregationScanOperator;
import
org.apache.iotdb.db.queryengine.execution.operator.source.relational.TreeToTableViewAdaptorOperator;
import
org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.AggregationOperator;
import
org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.LastByDescAccumulator;
@@ -185,6 +186,7 @@ import
org.apache.iotdb.db.queryengine.plan.relational.planner.ir.IrUtils;
import
org.apache.iotdb.db.queryengine.plan.relational.planner.node.AggregationNode;
import
org.apache.iotdb.db.queryengine.plan.relational.planner.node.AggregationTableScanNode;
import
org.apache.iotdb.db.queryengine.plan.relational.planner.node.AggregationTreeDeviceViewScanNode;
+import
org.apache.iotdb.db.queryengine.plan.relational.planner.node.AlignedAggregationTreeDeviceViewScanNode;
import
org.apache.iotdb.db.queryengine.plan.relational.planner.node.AssignUniqueId;
import
org.apache.iotdb.db.queryengine.plan.relational.planner.node.CollectNode;
import
org.apache.iotdb.db.queryengine.plan.relational.planner.node.CteScanNode;
@@ -203,6 +205,7 @@ import
org.apache.iotdb.db.queryengine.plan.relational.planner.node.LinearFillNo
import
org.apache.iotdb.db.queryengine.plan.relational.planner.node.MarkDistinctNode;
import org.apache.iotdb.db.queryengine.plan.relational.planner.node.Measure;
import
org.apache.iotdb.db.queryengine.plan.relational.planner.node.MergeSortNode;
+import
org.apache.iotdb.db.queryengine.plan.relational.planner.node.NonAlignedAggregationTreeDeviceViewScanNode;
import org.apache.iotdb.db.queryengine.plan.relational.planner.node.OffsetNode;
import org.apache.iotdb.db.queryengine.plan.relational.planner.node.OutputNode;
import
org.apache.iotdb.db.queryengine.plan.relational.planner.node.PatternRecognitionNode;
@@ -283,6 +286,7 @@ import org.apache.tsfile.read.common.type.TypeFactory;
import org.apache.tsfile.read.filter.basic.Filter;
import org.apache.tsfile.utils.Binary;
import org.apache.tsfile.utils.Pair;
+import org.apache.tsfile.utils.RamUsageEstimator;
import org.apache.tsfile.utils.TsPrimitiveType;
import org.apache.tsfile.write.schema.IMeasurementSchema;
import org.apache.tsfile.write.schema.MeasurementSchema;
@@ -584,6 +588,19 @@ public class TableOperatorGenerator extends
PlanVisitor<Operator, LocalExecution
private List<Expression> cannotPushDownConjuncts;
private boolean removeUpperOffsetAndLimitOperator;
+ private final long INSTANCE_SIZE =
+ RamUsageEstimator.shallowSizeOfInstance(this.getClass());
+
+ @Override
+ public long ramBytesUsed() {
+ return INSTANCE_SIZE
+ + (seriesScanOptionsList == null
+ ? 0L
+ : seriesScanOptionsList.stream()
+ .mapToLong(seriesScanOption ->
seriesScanOption.ramBytesUsed())
+ .sum());
+ }
+
@Override
public boolean keepOffsetAndLimitOperatorAfterDeviceIterator() {
calculateSeriesScanOptionsList();
@@ -591,9 +608,15 @@ public class TableOperatorGenerator extends
PlanVisitor<Operator, LocalExecution
}
@Override
- public void generateCurrentDeviceOperatorTree(DeviceEntry
deviceEntry) {
+ public void generateCurrentDeviceOperatorTree(
+ DeviceEntry deviceEntry, boolean needAdaptor) {
calculateSeriesScanOptionsList();
- operator = constructTreeToTableViewAdaptorOperator(deviceEntry);
+ if (needAdaptor) {
+ operator = constructTreeToTableViewAdaptorOperator(deviceEntry);
+ } else {
+ seriesScanOperators = new ArrayList<>(measurementSchemas.size());
+ operator = constructAndJoinScanOperators(deviceEntry);
+ }
boolean needToPruneColumn =
node.getAssignments().size() != node.getOutputSymbols().size();
if (isSingleColumn) {
@@ -2823,6 +2846,13 @@ public class TableOperatorGenerator extends
PlanVisitor<Operator, LocalExecution
@Override
public Operator visitAggregationTreeDeviceViewScan(
AggregationTreeDeviceViewScanNode node, LocalExecutionPlanContext
context) {
+ throw new UnsupportedOperationException(
+ "The AggregationTreeDeviceViewScanNode should has been transferred to
its child class node");
+ }
+
+ @Override
+ public Operator visitAlignedAggregationTreeDeviceViewScan(
+ AlignedAggregationTreeDeviceViewScanNode node, LocalExecutionPlanContext
context) {
QualifiedObjectName qualifiedObjectName = node.getQualifiedObjectName();
TsTable tsTable =
DataNodeTableCache.getInstance()
@@ -2848,10 +2878,74 @@ public class TableOperatorGenerator extends
PlanVisitor<Operator, LocalExecution
parameter.getMeasurementColumnNames(),
parameter.getMeasurementSchemas(),
parameter.getAllSensors(),
- AggregationTreeDeviceViewScanNode.class.getSimpleName());
+ AlignedAggregationTreeDeviceViewScanNode.class.getSimpleName());
return treeAlignedDeviceViewAggregationScanOperator;
}
+ @Override
+ public Operator visitNonAlignedAggregationTreeDeviceViewScan(
+ NonAlignedAggregationTreeDeviceViewScanNode node,
LocalExecutionPlanContext context) {
+ QualifiedObjectName qualifiedObjectName = node.getQualifiedObjectName();
+ TsTable tsTable =
+ DataNodeTableCache.getInstance()
+ .getTable(qualifiedObjectName.getDatabaseName(),
qualifiedObjectName.getObjectName());
+ IDeviceID.TreeDeviceIdColumnValueExtractor idColumnValueExtractor =
+
createTreeDeviceIdColumnValueExtractor(DataNodeTreeViewSchemaUtils.getPrefixPath(tsTable));
+
+ AbstractAggTableScanOperator.AbstractAggTableScanOperatorParameter
parameter =
+ constructAbstractAggTableScanOperatorParameter(
+ node,
+ context,
+
TreeNonAlignedDeviceViewAggregationScanOperator.class.getSimpleName(),
+ node.getMeasurementColumnNameMap(),
+ tsTable.getCachedTableTTL());
+
+ // construct source operator (generator)
+ TreeNonAlignedDeviceViewScanNode scanNode =
+ new TreeNonAlignedDeviceViewScanNode(
+ node.getPlanNodeId(),
+ node.getQualifiedObjectName(),
+ // the outputSymbols of
TreeNonAlignedDeviceViewAggregationScanOperator is not equals
+ // with TreeNonAlignedDeviceViewScanNode
+ parameter.getOutputSymbols(),
+ node.getAssignments(),
+ node.getDeviceEntries(),
+ node.getTagAndAttributeIndexMap(),
+ node.getScanOrder(),
+ node.getTimePredicate().orElse(null),
+ node.getPushDownPredicate(),
+ node.getPushDownLimit(),
+ node.getPushDownOffset(),
+ node.isPushLimitToEachDevice(),
+ true,
+ node.getTreeDBName(),
+ node.getMeasurementColumnNameMap());
+
+ Operator sourceOperator = visitTreeNonAlignedDeviceViewScan(scanNode,
context);
+ if (!(sourceOperator instanceof EmptyDataOperator)) {
+ // Use deviceChildOperatorTreeGenerator directly, we will control switch
of devices in
+ // TreeNonAlignedDeviceViewAggregationScanOperator
+ TreeNonAlignedDeviceViewAggregationScanOperator aggTableScanOperator =
+ new TreeNonAlignedDeviceViewAggregationScanOperator(
+ parameter,
+ idColumnValueExtractor,
+ ((DeviceIteratorScanOperator)
sourceOperator).getDeviceChildOperatorTreeGenerator());
+
+ addSource(
+ aggTableScanOperator,
+ context,
+ node,
+ parameter.getMeasurementColumnNames(),
+ parameter.getMeasurementSchemas(),
+ parameter.getAllSensors(),
+ NonAlignedAggregationTreeDeviceViewScanNode.class.getSimpleName());
+ return aggTableScanOperator;
+ } else {
+ // source data is empty, return directly
+ return sourceOperator;
+ }
+ }
+
private AbstractAggTableScanOperator.AbstractAggTableScanOperatorParameter
constructAbstractAggTableScanOperatorParameter(
AggregationTableScanNode node,
@@ -2882,6 +2976,8 @@ public class TableOperatorGenerator extends
PlanVisitor<Operator, LocalExecution
Map<Symbol, Integer> aggColumnLayout = new
HashMap<>(aggDistinctArgumentCount);
int[] aggColumnsIndexArray = new int[aggDistinctArgumentCount];
+ List<Symbol> outputSymbols = new ArrayList<>();
+
String timeColumnName = null;
int channel = 0;
int measurementColumnCount = 0;
@@ -2907,6 +3003,7 @@ public class TableOperatorGenerator extends
PlanVisitor<Operator, LocalExecution
measurementSchemas.add(
new MeasurementSchema(realMeasurementName,
getTSDataType(schema.getType())));
measurementColumnsIndexMap.put(symbol.getName(),
measurementColumnCount - 1);
+ outputSymbols.add(symbol);
break;
case TIME:
aggColumnsIndexArray[channel] = -1;
@@ -3046,7 +3143,8 @@ public class TableOperatorGenerator extends
PlanVisitor<Operator, LocalExecution
scanAscending,
canUseStatistic,
aggregatorInputChannels,
- timeColumnName);
+ timeColumnName,
+ outputSymbols);
}
// used for AggregationTableScanNode
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanNodeType.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanNodeType.java
index b7178d649db..4779dc92fba 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanNodeType.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanNodeType.java
@@ -119,7 +119,7 @@ import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.RelationalDe
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.RelationalInsertRowNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.RelationalInsertRowsNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.RelationalInsertTabletNode;
-import
org.apache.iotdb.db.queryengine.plan.relational.planner.node.AggregationTreeDeviceViewScanNode;
+import
org.apache.iotdb.db.queryengine.plan.relational.planner.node.AlignedAggregationTreeDeviceViewScanNode;
import
org.apache.iotdb.db.queryengine.plan.relational.planner.node.AssignUniqueId;
import
org.apache.iotdb.db.queryengine.plan.relational.planner.node.EnforceSingleRowNode;
import org.apache.iotdb.db.queryengine.plan.relational.planner.node.ExceptNode;
@@ -129,6 +129,7 @@ import
org.apache.iotdb.db.queryengine.plan.relational.planner.node.InformationS
import
org.apache.iotdb.db.queryengine.plan.relational.planner.node.IntersectNode;
import
org.apache.iotdb.db.queryengine.plan.relational.planner.node.LinearFillNode;
import
org.apache.iotdb.db.queryengine.plan.relational.planner.node.MarkDistinctNode;
+import
org.apache.iotdb.db.queryengine.plan.relational.planner.node.NonAlignedAggregationTreeDeviceViewScanNode;
import
org.apache.iotdb.db.queryengine.plan.relational.planner.node.PatternRecognitionNode;
import
org.apache.iotdb.db.queryengine.plan.relational.planner.node.PreviousFillNode;
import
org.apache.iotdb.db.queryengine.plan.relational.planner.node.RowNumberNode;
@@ -311,6 +312,7 @@ public enum PlanNodeType {
TABLE_EXPLAIN_ANALYZE_NODE((short) 1019),
TABLE_ENFORCE_SINGLE_ROW_NODE((short) 1020),
INFORMATION_SCHEMA_TABLE_SCAN_NODE((short) 1021),
+ @Deprecated
AGGREGATION_TREE_DEVICE_VIEW_SCAN_NODE((short) 1022),
TREE_ALIGNED_DEVICE_VIEW_SCAN_NODE((short) 1023),
TREE_NONALIGNED_DEVICE_VIEW_SCAN_NODE((short) 1024),
@@ -330,6 +332,8 @@ public enum PlanNodeType {
TABLE_ROW_NUMBER_NODE((short) 1038),
TABLE_VALUES_NODE((short) 1039),
TABLE_DISK_USAGE_INFORMATION_SCHEMA_TABLE_SCAN_NODE((short) 1040),
+ ALIGNED_AGGREGATION_TREE_DEVICE_VIEW_SCAN_NODE((short) 1041),
+ NON_ALIGNED_AGGREGATION_TREE_DEVICE_VIEW_SCAN_NODE((short) 1042),
RELATIONAL_INSERT_TABLET((short) 2000),
RELATIONAL_INSERT_ROW((short) 2001),
@@ -703,7 +707,8 @@ public enum PlanNodeType {
case 1021:
return InformationSchemaTableScanNode.deserialize(buffer);
case 1022:
- return AggregationTreeDeviceViewScanNode.deserialize(buffer);
+ throw new UnsupportedOperationException(
+ "AggregationTreeDeviceViewScanNode should not be deserialized");
case 1023:
return TreeAlignedDeviceViewScanNode.deserialize(buffer);
case 1024:
@@ -741,6 +746,10 @@ public enum PlanNodeType {
return ValuesNode.deserialize(buffer);
case 1040:
return
TableDiskUsageInformationSchemaTableScanNode.deserialize(buffer);
+ case 1041:
+ return AlignedAggregationTreeDeviceViewScanNode.deserialize(buffer);
+ case 1042:
+ return NonAlignedAggregationTreeDeviceViewScanNode.deserialize(buffer);
case 2000:
return RelationalInsertTabletNode.deserialize(buffer);
case 2001:
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanVisitor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanVisitor.java
index 957bfde5121..16ceb28be79 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanVisitor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanVisitor.java
@@ -125,6 +125,7 @@ import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.RelationalIn
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.RelationalInsertTabletNode;
import
org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.GroupReference;
import
org.apache.iotdb.db.queryengine.plan.relational.planner.node.AggregationTreeDeviceViewScanNode;
+import
org.apache.iotdb.db.queryengine.plan.relational.planner.node.AlignedAggregationTreeDeviceViewScanNode;
import
org.apache.iotdb.db.queryengine.plan.relational.planner.node.CteScanNode;
import
org.apache.iotdb.db.queryengine.plan.relational.planner.node.DeviceTableScanNode;
import org.apache.iotdb.db.queryengine.plan.relational.planner.node.ExceptNode;
@@ -134,6 +135,7 @@ import
org.apache.iotdb.db.queryengine.plan.relational.planner.node.InformationS
import
org.apache.iotdb.db.queryengine.plan.relational.planner.node.IntersectNode;
import
org.apache.iotdb.db.queryengine.plan.relational.planner.node.LinearFillNode;
import
org.apache.iotdb.db.queryengine.plan.relational.planner.node.MarkDistinctNode;
+import
org.apache.iotdb.db.queryengine.plan.relational.planner.node.NonAlignedAggregationTreeDeviceViewScanNode;
import
org.apache.iotdb.db.queryengine.plan.relational.planner.node.PatternRecognitionNode;
import
org.apache.iotdb.db.queryengine.plan.relational.planner.node.PreviousFillNode;
import
org.apache.iotdb.db.queryengine.plan.relational.planner.node.RowNumberNode;
@@ -851,6 +853,16 @@ public abstract class PlanVisitor<R, C> {
return visitAggregationTableScan(node, context);
}
+ public R visitAlignedAggregationTreeDeviceViewScan(
+ AlignedAggregationTreeDeviceViewScanNode node, C context) {
+ return visitAggregationTreeDeviceViewScan(node, context);
+ }
+
+ public R visitNonAlignedAggregationTreeDeviceViewScan(
+ NonAlignedAggregationTreeDeviceViewScanNode node, C context) {
+ return visitAggregationTreeDeviceViewScan(node, context);
+ }
+
public R visitTreeAlignedDeviceViewScan(TreeAlignedDeviceViewScanNode node,
C context) {
return visitTreeDeviceViewScan(node, context);
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/parameter/SeriesScanOptions.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/parameter/SeriesScanOptions.java
index 7fc4d3f81fc..38335a788f5 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/parameter/SeriesScanOptions.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/parameter/SeriesScanOptions.java
@@ -23,18 +23,21 @@ import org.apache.iotdb.commons.path.AlignedFullPath;
import org.apache.iotdb.commons.path.IFullPath;
import org.apache.iotdb.commons.path.NonAlignedFullPath;
import org.apache.iotdb.commons.utils.CommonDateTimeUtils;
+import
org.apache.iotdb.db.queryengine.execution.operator.source.relational.TreeNonAlignedDeviceViewAggregationScanOperator;
import org.apache.tsfile.read.filter.basic.Filter;
import org.apache.tsfile.read.filter.factory.FilterFactory;
import org.apache.tsfile.read.filter.factory.TimeFilterApi;
import org.apache.tsfile.read.reader.series.PaginationController;
+import org.apache.tsfile.utils.Accountable;
+import org.apache.tsfile.utils.RamUsageEstimator;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
-public class SeriesScanOptions {
+public class SeriesScanOptions implements Accountable {
private Filter globalTimeFilter;
private final Filter originalTimeFilter;
@@ -52,6 +55,9 @@ public class SeriesScanOptions {
private PaginationController paginationController;
private boolean isTableViewForTreeModel;
private long ttlForTableView = Long.MAX_VALUE;
+ private static final long INSTANCE_SIZE =
+ RamUsageEstimator.shallowSizeOfInstance(
+ TreeNonAlignedDeviceViewAggregationScanOperator.class);
public SeriesScanOptions(
Filter globalTimeFilter,
@@ -83,6 +89,11 @@ public class SeriesScanOptions {
return builder.build();
}
+ @Override
+ public long ramBytesUsed() {
+ return INSTANCE_SIZE;
+ }
+
public Filter getGlobalTimeFilter() {
return globalTimeFilter;
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributedPlanGenerator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributedPlanGenerator.java
index dcc7c65b120..529bc9ffc57 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributedPlanGenerator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributedPlanGenerator.java
@@ -61,6 +61,7 @@ import
org.apache.iotdb.db.queryengine.plan.relational.planner.SymbolAllocator;
import
org.apache.iotdb.db.queryengine.plan.relational.planner.node.AggregationNode;
import
org.apache.iotdb.db.queryengine.plan.relational.planner.node.AggregationTableScanNode;
import
org.apache.iotdb.db.queryengine.plan.relational.planner.node.AggregationTreeDeviceViewScanNode;
+import
org.apache.iotdb.db.queryengine.plan.relational.planner.node.AlignedAggregationTreeDeviceViewScanNode;
import
org.apache.iotdb.db.queryengine.plan.relational.planner.node.AssignUniqueId;
import
org.apache.iotdb.db.queryengine.plan.relational.planner.node.CollectNode;
import
org.apache.iotdb.db.queryengine.plan.relational.planner.node.DeviceTableScanNode;
@@ -76,6 +77,7 @@ import
org.apache.iotdb.db.queryengine.plan.relational.planner.node.JoinNode;
import org.apache.iotdb.db.queryengine.plan.relational.planner.node.LimitNode;
import
org.apache.iotdb.db.queryengine.plan.relational.planner.node.MarkDistinctNode;
import
org.apache.iotdb.db.queryengine.plan.relational.planner.node.MergeSortNode;
+import
org.apache.iotdb.db.queryengine.plan.relational.planner.node.NonAlignedAggregationTreeDeviceViewScanNode;
import org.apache.iotdb.db.queryengine.plan.relational.planner.node.OffsetNode;
import org.apache.iotdb.db.queryengine.plan.relational.planner.node.OutputNode;
import
org.apache.iotdb.db.queryengine.plan.relational.planner.node.PatternRecognitionNode;
@@ -1261,70 +1263,19 @@ public class TableDistributedPlanGenerator
@Override
public List<PlanNode> visitAggregationTableScan(
AggregationTableScanNode node, PlanContext context) {
- String dbName =
- node instanceof AggregationTreeDeviceViewScanNode
- ? ((AggregationTreeDeviceViewScanNode) node).getTreeDBName()
- : node.getQualifiedObjectName().getDatabaseName();
+ String dbName = node.getQualifiedObjectName().getDatabaseName();
DataPartition dataPartition = analysis.getDataPartitionInfo();
if (dbName == null || dataPartition == null) {
node.setRegionReplicaSet(NOT_ASSIGNED);
return Collections.singletonList(node);
}
- boolean needSplit = false;
- List<List<TRegionReplicaSet>> regionReplicaSetsList = new ArrayList<>();
- if (dataPartition != null) {
- Map<TSeriesPartitionSlot, Map<TTimePartitionSlot,
List<TRegionReplicaSet>>> seriesSlotMap =
- dataPartition.getDataPartitionMap().get(dbName);
- if (seriesSlotMap == null) {
- throw new SemanticException(
- String.format("Given queried database: %s is not exist!", dbName));
- }
-
- Map<Integer, List<TRegionReplicaSet>> cachedSeriesSlotWithRegions = new
HashMap<>();
- for (DeviceEntry deviceEntry : node.getDeviceEntries()) {
- List<TRegionReplicaSet> regionReplicaSets =
- getDeviceReplicaSets(
- dataPartition,
- seriesSlotMap,
- deviceEntry.getDeviceID(),
- node.getTimeFilter(),
- cachedSeriesSlotWithRegions);
- if (regionReplicaSets.size() > 1) {
- needSplit = true;
- context.deviceCrossRegion = true;
-
queryContext.setNeedUpdateScanNumForLastQuery(node.mayUseLastCache());
- }
- regionReplicaSetsList.add(regionReplicaSets);
- }
- }
- if (regionReplicaSetsList.isEmpty()) {
- regionReplicaSetsList =
Collections.singletonList(Collections.singletonList(NOT_ASSIGNED));
- }
+ AggregationDistributionInfo distributionInfo =
+ prepareAggregationDistribution(node, dbName, dataPartition, context);
Map<TRegionReplicaSet, AggregationTableScanNode> regionNodeMap = new
HashMap<>();
- // Step is SINGLE and device data in more than one region, we need to
final aggregate the result
- // from different region here, so split
- // this node into two-stage
- needSplit = needSplit && node.getStep() == SINGLE;
- AggregationNode finalAggregation = null;
- if (needSplit) {
- Pair<AggregationNode, AggregationTableScanNode> splitResult =
- split(node, symbolAllocator, queryId);
- finalAggregation = splitResult.left;
- AggregationTableScanNode partialAggregation = splitResult.right;
-
- // cover case: complete push-down + group by + streamable
- if (!context.hasSortProperty && finalAggregation.isStreamable()) {
- OrderingScheme expectedOrderingSchema =
- constructOrderingSchema(node.getPreGroupedSymbols());
- context.setExpectedOrderingScheme(expectedOrderingSchema);
- }
-
- buildRegionNodeMap(node, regionReplicaSetsList, regionNodeMap,
partialAggregation);
- } else {
- buildRegionNodeMap(node, regionReplicaSetsList, regionNodeMap, node);
- }
+ buildRegionNodeMap(
+ node, distributionInfo.regionReplicaSetsList, regionNodeMap,
distributionInfo.templateNode);
List<PlanNode> resultTableScanNodeList = new ArrayList<>();
TRegionReplicaSet mostUsedDataRegion = null;
@@ -1349,6 +1300,189 @@ public class TableDistributedPlanGenerator
processSortProperty(node, resultTableScanNodeList, context);
}
+ if (distributionInfo.needSplit) {
+ AggregationNode finalAggregation = distributionInfo.finalAggregation;
+ if (resultTableScanNodeList.size() == 1) {
+ finalAggregation.setChild(resultTableScanNodeList.get(0));
+ } else if (resultTableScanNodeList.size() > 1) {
+ OrderingScheme childOrdering =
+
nodeOrderingMap.get(resultTableScanNodeList.get(0).getPlanNodeId());
+ finalAggregation.setChild(
+ mergeChildrenViaCollectOrMergeSort(childOrdering,
resultTableScanNodeList));
+ } else {
+ throw new IllegalStateException("List<PlanNode>.size should >= 1, but
now is 0");
+ }
+ resultTableScanNodeList = Collections.singletonList(finalAggregation);
+ }
+
+ return resultTableScanNodeList;
+ }
+
+ @Override
+ public List<PlanNode> visitAggregationTreeDeviceViewScan(
+ AggregationTreeDeviceViewScanNode node, PlanContext context) {
+ String dbName = node.getTreeDBName();
+ DataPartition dataPartition = analysis.getDataPartitionInfo();
+ if (dbName == null || dataPartition == null) {
+ node.setRegionReplicaSet(NOT_ASSIGNED);
+ return Collections.singletonList(
+ new AlignedAggregationTreeDeviceViewScanNode(
+ node.getPlanNodeId(),
+ node.getQualifiedObjectName(),
+ node.getOutputSymbols(),
+ node.getAssignments(),
+ node.getDeviceEntries(),
+ node.getTagAndAttributeIndexMap(),
+ node.getScanOrder(),
+ node.getTimePredicate().orElse(null),
+ node.getPushDownPredicate(),
+ node.getPushDownLimit(),
+ node.getPushDownOffset(),
+ node.isPushLimitToEachDevice(),
+ node.containsNonAlignedDevice(),
+ node.getProjection(),
+ node.getAggregations(),
+ node.getGroupingSets(),
+ node.getPreGroupedSymbols(),
+ node.getStep(),
+ node.getGroupIdSymbol(),
+ node.getTreeDBName(),
+ node.getMeasurementColumnNameMap()));
+ }
+
+ AggregationDistributionInfo distributionInfo =
+ prepareAggregationDistribution(node, dbName, dataPartition, context);
+
+ List<List<TRegionReplicaSet>> regionReplicaSetsList =
distributionInfo.regionReplicaSetsList;
+ AggregationTableScanNode templateNode = distributionInfo.templateNode;
+ AggregationNode finalAggregation = distributionInfo.finalAggregation;
+ boolean needSplit = distributionInfo.needSplit;
+
+ Map<
+ TRegionReplicaSet,
+ Pair<
+ AlignedAggregationTreeDeviceViewScanNode,
+ NonAlignedAggregationTreeDeviceViewScanNode>>
+ tableScanNodeMap = new HashMap<>();
+
+ // construct AlignedAggregationTreeDeviceViewScanNode and
+ // NonAlignedAggregationTreeDeviceViewScanNode for each region
+ for (int i = 0; i < regionReplicaSetsList.size(); i++) {
+ DeviceEntry deviceEntry = node.getDeviceEntries().get(i);
+ List<TRegionReplicaSet> regionReplicaSets = regionReplicaSetsList.get(i);
+
+ for (TRegionReplicaSet regionReplicaSet : regionReplicaSets) {
+ boolean aligned = deviceEntry instanceof AlignedDeviceEntry;
+ Pair<AlignedAggregationTreeDeviceViewScanNode,
NonAlignedAggregationTreeDeviceViewScanNode>
+ pair = tableScanNodeMap.computeIfAbsent(regionReplicaSet, k -> new
Pair<>(null, null));
+
+ if (aligned && pair.left == null) {
+ AlignedAggregationTreeDeviceViewScanNode scanNode =
+ new AlignedAggregationTreeDeviceViewScanNode(
+ queryId.genPlanNodeId(),
+ templateNode.getQualifiedObjectName(),
+ templateNode.getOutputSymbols(),
+ templateNode.getAssignments(),
+ new ArrayList<>(),
+ templateNode.getTagAndAttributeIndexMap(),
+ templateNode.getScanOrder(),
+ templateNode.getTimePredicate().orElse(null),
+ templateNode.getPushDownPredicate(),
+ templateNode.getPushDownLimit(),
+ templateNode.getPushDownOffset(),
+ templateNode.isPushLimitToEachDevice(),
+ templateNode.containsNonAlignedDevice(),
+ templateNode.getProjection(),
+ templateNode.getAggregations(),
+ templateNode.getGroupingSets(),
+ templateNode.getPreGroupedSymbols(),
+ templateNode.getStep(),
+ templateNode.getGroupIdSymbol(),
+ node.getTreeDBName(),
+ node.getMeasurementColumnNameMap());
+ scanNode.setRegionReplicaSet(regionReplicaSet);
+ pair.left = scanNode;
+ }
+
+ if (!aligned && pair.right == null) {
+ NonAlignedAggregationTreeDeviceViewScanNode scanNode =
+ new NonAlignedAggregationTreeDeviceViewScanNode(
+ queryId.genPlanNodeId(),
+ templateNode.getQualifiedObjectName(),
+ templateNode.getOutputSymbols(),
+ templateNode.getAssignments(),
+ new ArrayList<>(),
+ templateNode.getTagAndAttributeIndexMap(),
+ templateNode.getScanOrder(),
+ templateNode.getTimePredicate().orElse(null),
+ templateNode.getPushDownPredicate(),
+ templateNode.getPushDownLimit(),
+ templateNode.getPushDownOffset(),
+ templateNode.isPushLimitToEachDevice(),
+ templateNode.containsNonAlignedDevice(),
+ templateNode.getProjection(),
+ templateNode.getAggregations(),
+ templateNode.getGroupingSets(),
+ templateNode.getPreGroupedSymbols(),
+ templateNode.getStep(),
+ templateNode.getGroupIdSymbol(),
+ node.getTreeDBName(),
+ node.getMeasurementColumnNameMap());
+ scanNode.setRegionReplicaSet(regionReplicaSet);
+ pair.right = scanNode;
+ }
+
+ if (aligned) {
+ pair.left.appendDeviceEntry(deviceEntry);
+ } else {
+ pair.right.appendDeviceEntry(deviceEntry);
+ }
+ }
+ }
+
+ if (tableScanNodeMap.isEmpty()) {
+ node.setRegionReplicaSet(NOT_ASSIGNED);
+ return Collections.singletonList(node);
+ }
+
+ List<PlanNode> resultTableScanNodeList = new ArrayList<>();
+ TRegionReplicaSet mostUsedDataRegion = null;
+ int maxDeviceEntrySizeOfTableScan = 0;
+ for (Map.Entry<
+ TRegionReplicaSet,
+ Pair<
+ AlignedAggregationTreeDeviceViewScanNode,
+ NonAlignedAggregationTreeDeviceViewScanNode>>
+ entry :
topology.filterReachableCandidates(tableScanNodeMap.entrySet())) {
+ TRegionReplicaSet regionReplicaSet = entry.getKey();
+ Pair<AlignedAggregationTreeDeviceViewScanNode,
NonAlignedAggregationTreeDeviceViewScanNode>
+ pair = entry.getValue();
+ int currentDeviceEntrySize = 0;
+
+ if (pair.left != null) {
+ currentDeviceEntrySize += pair.left.getDeviceEntries().size();
+ resultTableScanNodeList.add(pair.left);
+ }
+
+ if (pair.right != null) {
+ currentDeviceEntrySize += pair.right.getDeviceEntries().size();
+ resultTableScanNodeList.add(pair.right);
+ }
+
+ if (mostUsedDataRegion == null || currentDeviceEntrySize >
maxDeviceEntrySizeOfTableScan) {
+ mostUsedDataRegion = regionReplicaSet;
+ maxDeviceEntrySizeOfTableScan = currentDeviceEntrySize;
+ }
+ }
+ if (mostUsedDataRegion == null) {
+ throw new RootFIPlacementException(tableScanNodeMap.keySet());
+ }
+ context.mostUsedRegion = mostUsedDataRegion;
+
+ if (context.hasSortProperty) {
+ processSortProperty(node, resultTableScanNodeList, context);
+ }
+
if (needSplit) {
if (resultTableScanNodeList.size() == 1) {
finalAggregation.setChild(resultTableScanNodeList.get(0));
@@ -1366,6 +1500,83 @@ public class TableDistributedPlanGenerator
return resultTableScanNodeList;
}
+ private static class AggregationDistributionInfo {
+ private final List<List<TRegionReplicaSet>> regionReplicaSetsList;
+ private final AggregationTableScanNode templateNode;
+ private final AggregationNode finalAggregation;
+ private final boolean needSplit;
+
+ AggregationDistributionInfo(
+ List<List<TRegionReplicaSet>> regionReplicaSetsList,
+ AggregationTableScanNode templateNode,
+ AggregationNode finalAggregation,
+ boolean needSplit) {
+ this.regionReplicaSetsList = regionReplicaSetsList;
+ this.templateNode = templateNode;
+ this.finalAggregation = finalAggregation;
+ this.needSplit = needSplit;
+ }
+ }
+
+ private AggregationDistributionInfo prepareAggregationDistribution(
+ AggregationTableScanNode node,
+ String dbName,
+ DataPartition dataPartition,
+ PlanContext context) {
+ boolean needSplit = false;
+ List<List<TRegionReplicaSet>> regionReplicaSetsList = new ArrayList<>();
+
+ Map<TSeriesPartitionSlot, Map<TTimePartitionSlot,
List<TRegionReplicaSet>>> seriesSlotMap =
+ dataPartition.getDataPartitionMap().get(dbName);
+ if (seriesSlotMap == null) {
+ throw new SemanticException(
+ String.format("Given queried database: %s is not exist!", dbName));
+ }
+
+ Map<Integer, List<TRegionReplicaSet>> cachedSeriesSlotWithRegions = new
HashMap<>();
+ for (DeviceEntry deviceEntry : node.getDeviceEntries()) {
+ List<TRegionReplicaSet> regionReplicaSets =
+ getDeviceReplicaSets(
+ dataPartition,
+ seriesSlotMap,
+ deviceEntry.getDeviceID(),
+ node.getTimeFilter(),
+ cachedSeriesSlotWithRegions);
+ if (regionReplicaSets.size() > 1) {
+ needSplit = true;
+ context.deviceCrossRegion = true;
+ queryContext.setNeedUpdateScanNumForLastQuery(node.mayUseLastCache());
+ }
+ regionReplicaSetsList.add(regionReplicaSets);
+ }
+
+ if (regionReplicaSetsList.isEmpty()) {
+ regionReplicaSetsList =
Collections.singletonList(Collections.singletonList(NOT_ASSIGNED));
+ }
+
+ AggregationTableScanNode templateNode = node;
+ AggregationNode finalAggregation = null;
+ // Step is SINGLE and device data in more than one region, we need to
final aggregate the result
+ // from different region here, so split this node into two-stage
+ needSplit = needSplit && node.getStep() == SINGLE;
+ if (needSplit) {
+ Pair<AggregationNode, AggregationTableScanNode> splitResult =
+ split(node, symbolAllocator, queryId);
+ finalAggregation = splitResult.left;
+ templateNode = splitResult.right;
+
+ // cover case: complete push-down + group by + streamable
+ if (!context.hasSortProperty && finalAggregation.isStreamable()) {
+ OrderingScheme expectedOrderingSchema =
+ constructOrderingSchema(node.getPreGroupedSymbols());
+ context.setExpectedOrderingScheme(expectedOrderingSchema);
+ }
+ }
+
+ return new AggregationDistributionInfo(
+ regionReplicaSetsList, templateNode, finalAggregation, needSplit);
+ }
+
private List<TRegionReplicaSet> getDeviceReplicaSets(
DataPartition dataPartition,
Map<TSeriesPartitionSlot, Map<TTimePartitionSlot,
List<TRegionReplicaSet>>> seriesSlotMap,
@@ -1479,14 +1690,6 @@ public class TableDistributedPlanGenerator
List<List<TRegionReplicaSet>> regionReplicaSetsList,
Map<TRegionReplicaSet, AggregationTableScanNode> regionNodeMap,
AggregationTableScanNode partialAggTableScanNode) {
- AggregationTreeDeviceViewScanNode aggregationTreeDeviceViewScanNode;
- if (originalAggTableScanNode instanceof AggregationTreeDeviceViewScanNode)
{
- aggregationTreeDeviceViewScanNode =
- (AggregationTreeDeviceViewScanNode) originalAggTableScanNode;
- } else {
- aggregationTreeDeviceViewScanNode = null;
- }
-
for (int i = 0; i < regionReplicaSetsList.size(); i++) {
for (TRegionReplicaSet regionReplicaSet : regionReplicaSetsList.get(i)) {
AggregationTableScanNode aggregationTableScanNode =
@@ -1494,49 +1697,26 @@ public class TableDistributedPlanGenerator
regionReplicaSet,
k -> {
AggregationTableScanNode scanNode =
- (aggregationTreeDeviceViewScanNode == null)
- ? new AggregationTableScanNode(
- queryId.genPlanNodeId(),
- partialAggTableScanNode.getQualifiedObjectName(),
- partialAggTableScanNode.getOutputSymbols(),
- partialAggTableScanNode.getAssignments(),
- new ArrayList<>(),
-
partialAggTableScanNode.getTagAndAttributeIndexMap(),
- partialAggTableScanNode.getScanOrder(),
-
partialAggTableScanNode.getTimePredicate().orElse(null),
- partialAggTableScanNode.getPushDownPredicate(),
- partialAggTableScanNode.getPushDownLimit(),
- partialAggTableScanNode.getPushDownOffset(),
-
partialAggTableScanNode.isPushLimitToEachDevice(),
-
partialAggTableScanNode.containsNonAlignedDevice(),
- partialAggTableScanNode.getProjection(),
- partialAggTableScanNode.getAggregations(),
- partialAggTableScanNode.getGroupingSets(),
- partialAggTableScanNode.getPreGroupedSymbols(),
- partialAggTableScanNode.getStep(),
- partialAggTableScanNode.getGroupIdSymbol())
- : new AggregationTreeDeviceViewScanNode(
- queryId.genPlanNodeId(),
- partialAggTableScanNode.getQualifiedObjectName(),
- partialAggTableScanNode.getOutputSymbols(),
- partialAggTableScanNode.getAssignments(),
- new ArrayList<>(),
-
partialAggTableScanNode.getTagAndAttributeIndexMap(),
- partialAggTableScanNode.getScanOrder(),
-
partialAggTableScanNode.getTimePredicate().orElse(null),
- partialAggTableScanNode.getPushDownPredicate(),
- partialAggTableScanNode.getPushDownLimit(),
- partialAggTableScanNode.getPushDownOffset(),
-
partialAggTableScanNode.isPushLimitToEachDevice(),
-
partialAggTableScanNode.containsNonAlignedDevice(),
- partialAggTableScanNode.getProjection(),
- partialAggTableScanNode.getAggregations(),
- partialAggTableScanNode.getGroupingSets(),
- partialAggTableScanNode.getPreGroupedSymbols(),
- partialAggTableScanNode.getStep(),
- partialAggTableScanNode.getGroupIdSymbol(),
-
aggregationTreeDeviceViewScanNode.getTreeDBName(),
-
aggregationTreeDeviceViewScanNode.getMeasurementColumnNameMap());
+ new AggregationTableScanNode(
+ queryId.genPlanNodeId(),
+ partialAggTableScanNode.getQualifiedObjectName(),
+ partialAggTableScanNode.getOutputSymbols(),
+ partialAggTableScanNode.getAssignments(),
+ new ArrayList<>(),
+ partialAggTableScanNode.getTagAndAttributeIndexMap(),
+ partialAggTableScanNode.getScanOrder(),
+
partialAggTableScanNode.getTimePredicate().orElse(null),
+ partialAggTableScanNode.getPushDownPredicate(),
+ partialAggTableScanNode.getPushDownLimit(),
+ partialAggTableScanNode.getPushDownOffset(),
+ partialAggTableScanNode.isPushLimitToEachDevice(),
+ partialAggTableScanNode.containsNonAlignedDevice(),
+ partialAggTableScanNode.getProjection(),
+ partialAggTableScanNode.getAggregations(),
+ partialAggTableScanNode.getGroupingSets(),
+ partialAggTableScanNode.getPreGroupedSymbols(),
+ partialAggTableScanNode.getStep(),
+ partialAggTableScanNode.getGroupIdSymbol());
scanNode.setRegionReplicaSet(regionReplicaSet);
return scanNode;
});
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/AggregationTreeDeviceViewScanNode.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/AggregationTreeDeviceViewScanNode.java
index b0fd554de31..1635f2d7345 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/AggregationTreeDeviceViewScanNode.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/AggregationTreeDeviceViewScanNode.java
@@ -91,7 +91,7 @@ public class AggregationTreeDeviceViewScanNode extends
AggregationTableScanNode
this.measurementColumnNameMap = measurementColumnNameMap;
}
- private AggregationTreeDeviceViewScanNode() {}
+ protected AggregationTreeDeviceViewScanNode() {}
public String getTreeDBName() {
return treeDBName;
@@ -158,9 +158,14 @@ public class AggregationTreeDeviceViewScanNode extends
AggregationTableScanNode
measurementColumnNameMap);
}
+ protected PlanNodeType getPlanNodeType() {
+ // This node is not supported to serde
+ throw new UnsupportedOperationException("Not supported yet.");
+ }
+
@Override
protected void serializeAttributes(ByteBuffer byteBuffer) {
- PlanNodeType.AGGREGATION_TREE_DEVICE_VIEW_SCAN_NODE.serialize(byteBuffer);
+ getPlanNodeType().serialize(byteBuffer);
AggregationTableScanNode.serializeMemberVariables(this, byteBuffer);
@@ -174,7 +179,7 @@ public class AggregationTreeDeviceViewScanNode extends
AggregationTableScanNode
@Override
protected void serializeAttributes(DataOutputStream stream) throws
IOException {
- PlanNodeType.AGGREGATION_TREE_DEVICE_VIEW_SCAN_NODE.serialize(stream);
+ getPlanNodeType().serialize(stream);
AggregationTableScanNode.serializeMemberVariables(this, stream);
@@ -186,8 +191,8 @@ public class AggregationTreeDeviceViewScanNode extends
AggregationTableScanNode
}
}
- public static AggregationTreeDeviceViewScanNode deserialize(ByteBuffer
byteBuffer) {
- AggregationTreeDeviceViewScanNode node = new
AggregationTreeDeviceViewScanNode();
+ protected static AggregationTreeDeviceViewScanNode deserialize(
+ ByteBuffer byteBuffer, AggregationTreeDeviceViewScanNode node) {
AggregationTableScanNode.deserializeMemberVariables(byteBuffer, node);
node.treeDBName = ReadWriteIOUtils.readString(byteBuffer);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/AggregationTreeDeviceViewScanNode.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/AlignedAggregationTreeDeviceViewScanNode.java
similarity index 52%
copy from
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/AggregationTreeDeviceViewScanNode.java
copy to
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/AlignedAggregationTreeDeviceViewScanNode.java
index b0fd554de31..f1758f2de8b 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/AggregationTreeDeviceViewScanNode.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/AlignedAggregationTreeDeviceViewScanNode.java
@@ -30,22 +30,14 @@ import
org.apache.iotdb.db.queryengine.plan.relational.planner.Symbol;
import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Expression;
import org.apache.iotdb.db.queryengine.plan.statement.component.Ordering;
-import org.apache.tsfile.utils.ReadWriteIOUtils;
-
-import java.io.DataOutputStream;
-import java.io.IOException;
import java.nio.ByteBuffer;
-import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.Objects;
import java.util.Optional;
-public class AggregationTreeDeviceViewScanNode extends
AggregationTableScanNode {
- private String treeDBName;
- private Map<String, String> measurementColumnNameMap;
+public class AlignedAggregationTreeDeviceViewScanNode extends
AggregationTreeDeviceViewScanNode {
- public AggregationTreeDeviceViewScanNode(
+ public AlignedAggregationTreeDeviceViewScanNode(
PlanNodeId id,
QualifiedObjectName qualifiedObjectName,
List<Symbol> outputSymbols,
@@ -86,56 +78,22 @@ public class AggregationTreeDeviceViewScanNode extends
AggregationTableScanNode
groupingSets,
preGroupedSymbols,
step,
- groupIdSymbol);
- this.treeDBName = treeDBName;
- this.measurementColumnNameMap = measurementColumnNameMap;
- }
-
- private AggregationTreeDeviceViewScanNode() {}
-
- public String getTreeDBName() {
- return treeDBName;
+ groupIdSymbol,
+ treeDBName,
+ measurementColumnNameMap);
}
- public Map<String, String> getMeasurementColumnNameMap() {
- return measurementColumnNameMap;
- }
+ private AlignedAggregationTreeDeviceViewScanNode() {}
@Override
public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {
- return visitor.visitAggregationTreeDeviceViewScan(this, context);
+ return visitor.visitAlignedAggregationTreeDeviceViewScan(this, context);
}
@Override
- public boolean equals(Object o) {
- if (this == o) {
- return true;
- }
- if (o == null || getClass() != o.getClass()) {
- return false;
- }
- if (!super.equals(o)) {
- return false;
- }
- AggregationTreeDeviceViewScanNode that =
(AggregationTreeDeviceViewScanNode) o;
- return Objects.equals(treeDBName, that.treeDBName)
- && Objects.equals(measurementColumnNameMap,
that.measurementColumnNameMap);
- }
-
- @Override
- public int hashCode() {
- return Objects.hash(super.hashCode(), treeDBName,
measurementColumnNameMap);
- }
-
- @Override
- public String toString() {
- return "AggregationTreeDeviceViewTableScanNode-" + this.getPlanNodeId();
- }
-
- @Override
- public AggregationTreeDeviceViewScanNode clone() {
- return new AggregationTreeDeviceViewScanNode(
- id,
+ public AlignedAggregationTreeDeviceViewScanNode clone() {
+ return new AlignedAggregationTreeDeviceViewScanNode(
+ getPlanNodeId(),
qualifiedObjectName,
outputSymbols,
assignments,
@@ -154,52 +112,21 @@ public class AggregationTreeDeviceViewScanNode extends
AggregationTableScanNode
preGroupedSymbols,
step,
groupIdSymbol,
- treeDBName,
- measurementColumnNameMap);
- }
-
- @Override
- protected void serializeAttributes(ByteBuffer byteBuffer) {
- PlanNodeType.AGGREGATION_TREE_DEVICE_VIEW_SCAN_NODE.serialize(byteBuffer);
-
- AggregationTableScanNode.serializeMemberVariables(this, byteBuffer);
-
- ReadWriteIOUtils.write(treeDBName, byteBuffer);
- ReadWriteIOUtils.write(measurementColumnNameMap.size(), byteBuffer);
- for (Map.Entry<String, String> entry :
measurementColumnNameMap.entrySet()) {
- ReadWriteIOUtils.write(entry.getKey(), byteBuffer);
- ReadWriteIOUtils.write(entry.getValue(), byteBuffer);
- }
+ getTreeDBName(),
+ getMeasurementColumnNameMap());
}
- @Override
- protected void serializeAttributes(DataOutputStream stream) throws
IOException {
- PlanNodeType.AGGREGATION_TREE_DEVICE_VIEW_SCAN_NODE.serialize(stream);
-
- AggregationTableScanNode.serializeMemberVariables(this, stream);
-
- ReadWriteIOUtils.write(treeDBName, stream);
- ReadWriteIOUtils.write(measurementColumnNameMap.size(), stream);
- for (Map.Entry<String, String> entry :
measurementColumnNameMap.entrySet()) {
- ReadWriteIOUtils.write(entry.getKey(), stream);
- ReadWriteIOUtils.write(entry.getValue(), stream);
- }
+ protected PlanNodeType getPlanNodeType() {
+ return PlanNodeType.ALIGNED_AGGREGATION_TREE_DEVICE_VIEW_SCAN_NODE;
}
public static AggregationTreeDeviceViewScanNode deserialize(ByteBuffer
byteBuffer) {
- AggregationTreeDeviceViewScanNode node = new
AggregationTreeDeviceViewScanNode();
- AggregationTableScanNode.deserializeMemberVariables(byteBuffer, node);
-
- node.treeDBName = ReadWriteIOUtils.readString(byteBuffer);
- int size = ReadWriteIOUtils.readInt(byteBuffer);
- Map<String, String> measurementColumnNameMap = new HashMap<>(size);
- for (int i = 0; i < size; i++) {
- measurementColumnNameMap.put(
- ReadWriteIOUtils.readString(byteBuffer),
ReadWriteIOUtils.readString(byteBuffer));
- }
- node.measurementColumnNameMap = measurementColumnNameMap;
+ return AggregationTreeDeviceViewScanNode.deserialize(
+ byteBuffer, new AlignedAggregationTreeDeviceViewScanNode());
+ }
- node.setPlanNodeId(PlanNodeId.deserialize(byteBuffer));
- return node;
+ @Override
+ public String toString() {
+ return "AlignedAggregationTreeDeviceViewScanNode-" + this.getPlanNodeId();
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/AggregationTreeDeviceViewScanNode.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/NonAlignedAggregationTreeDeviceViewScanNode.java
similarity index 52%
copy from
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/AggregationTreeDeviceViewScanNode.java
copy to
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/NonAlignedAggregationTreeDeviceViewScanNode.java
index b0fd554de31..8380f56164d 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/AggregationTreeDeviceViewScanNode.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/NonAlignedAggregationTreeDeviceViewScanNode.java
@@ -30,22 +30,14 @@ import
org.apache.iotdb.db.queryengine.plan.relational.planner.Symbol;
import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Expression;
import org.apache.iotdb.db.queryengine.plan.statement.component.Ordering;
-import org.apache.tsfile.utils.ReadWriteIOUtils;
-
-import java.io.DataOutputStream;
-import java.io.IOException;
import java.nio.ByteBuffer;
-import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.Objects;
import java.util.Optional;
-public class AggregationTreeDeviceViewScanNode extends
AggregationTableScanNode {
- private String treeDBName;
- private Map<String, String> measurementColumnNameMap;
+public class NonAlignedAggregationTreeDeviceViewScanNode extends
AggregationTreeDeviceViewScanNode {
- public AggregationTreeDeviceViewScanNode(
+ public NonAlignedAggregationTreeDeviceViewScanNode(
PlanNodeId id,
QualifiedObjectName qualifiedObjectName,
List<Symbol> outputSymbols,
@@ -86,56 +78,22 @@ public class AggregationTreeDeviceViewScanNode extends
AggregationTableScanNode
groupingSets,
preGroupedSymbols,
step,
- groupIdSymbol);
- this.treeDBName = treeDBName;
- this.measurementColumnNameMap = measurementColumnNameMap;
- }
-
- private AggregationTreeDeviceViewScanNode() {}
-
- public String getTreeDBName() {
- return treeDBName;
+ groupIdSymbol,
+ treeDBName,
+ measurementColumnNameMap);
}
- public Map<String, String> getMeasurementColumnNameMap() {
- return measurementColumnNameMap;
- }
+ private NonAlignedAggregationTreeDeviceViewScanNode() {}
@Override
public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {
- return visitor.visitAggregationTreeDeviceViewScan(this, context);
+ return visitor.visitNonAlignedAggregationTreeDeviceViewScan(this, context);
}
@Override
- public boolean equals(Object o) {
- if (this == o) {
- return true;
- }
- if (o == null || getClass() != o.getClass()) {
- return false;
- }
- if (!super.equals(o)) {
- return false;
- }
- AggregationTreeDeviceViewScanNode that =
(AggregationTreeDeviceViewScanNode) o;
- return Objects.equals(treeDBName, that.treeDBName)
- && Objects.equals(measurementColumnNameMap,
that.measurementColumnNameMap);
- }
-
- @Override
- public int hashCode() {
- return Objects.hash(super.hashCode(), treeDBName,
measurementColumnNameMap);
- }
-
- @Override
- public String toString() {
- return "AggregationTreeDeviceViewTableScanNode-" + this.getPlanNodeId();
- }
-
- @Override
- public AggregationTreeDeviceViewScanNode clone() {
- return new AggregationTreeDeviceViewScanNode(
- id,
+ public NonAlignedAggregationTreeDeviceViewScanNode clone() {
+ return new NonAlignedAggregationTreeDeviceViewScanNode(
+ getPlanNodeId(),
qualifiedObjectName,
outputSymbols,
assignments,
@@ -154,52 +112,21 @@ public class AggregationTreeDeviceViewScanNode extends
AggregationTableScanNode
preGroupedSymbols,
step,
groupIdSymbol,
- treeDBName,
- measurementColumnNameMap);
- }
-
- @Override
- protected void serializeAttributes(ByteBuffer byteBuffer) {
- PlanNodeType.AGGREGATION_TREE_DEVICE_VIEW_SCAN_NODE.serialize(byteBuffer);
-
- AggregationTableScanNode.serializeMemberVariables(this, byteBuffer);
-
- ReadWriteIOUtils.write(treeDBName, byteBuffer);
- ReadWriteIOUtils.write(measurementColumnNameMap.size(), byteBuffer);
- for (Map.Entry<String, String> entry :
measurementColumnNameMap.entrySet()) {
- ReadWriteIOUtils.write(entry.getKey(), byteBuffer);
- ReadWriteIOUtils.write(entry.getValue(), byteBuffer);
- }
+ getTreeDBName(),
+ getMeasurementColumnNameMap());
}
- @Override
- protected void serializeAttributes(DataOutputStream stream) throws
IOException {
- PlanNodeType.AGGREGATION_TREE_DEVICE_VIEW_SCAN_NODE.serialize(stream);
-
- AggregationTableScanNode.serializeMemberVariables(this, stream);
-
- ReadWriteIOUtils.write(treeDBName, stream);
- ReadWriteIOUtils.write(measurementColumnNameMap.size(), stream);
- for (Map.Entry<String, String> entry :
measurementColumnNameMap.entrySet()) {
- ReadWriteIOUtils.write(entry.getKey(), stream);
- ReadWriteIOUtils.write(entry.getValue(), stream);
- }
+ protected PlanNodeType getPlanNodeType() {
+ return PlanNodeType.NON_ALIGNED_AGGREGATION_TREE_DEVICE_VIEW_SCAN_NODE;
}
public static AggregationTreeDeviceViewScanNode deserialize(ByteBuffer
byteBuffer) {
- AggregationTreeDeviceViewScanNode node = new
AggregationTreeDeviceViewScanNode();
- AggregationTableScanNode.deserializeMemberVariables(byteBuffer, node);
-
- node.treeDBName = ReadWriteIOUtils.readString(byteBuffer);
- int size = ReadWriteIOUtils.readInt(byteBuffer);
- Map<String, String> measurementColumnNameMap = new HashMap<>(size);
- for (int i = 0; i < size; i++) {
- measurementColumnNameMap.put(
- ReadWriteIOUtils.readString(byteBuffer),
ReadWriteIOUtils.readString(byteBuffer));
- }
- node.measurementColumnNameMap = measurementColumnNameMap;
+ return AggregationTreeDeviceViewScanNode.deserialize(
+ byteBuffer, new NonAlignedAggregationTreeDeviceViewScanNode());
+ }
- node.setPlanNodeId(PlanNodeId.deserialize(byteBuffer));
- return node;
+ @Override
+ public String toString() {
+ return "NonAlignedAggregationTreeDeviceViewScanNode-" +
this.getPlanNodeId();
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/PushAggregationIntoTableScan.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/PushAggregationIntoTableScan.java
index a47794fd329..3bcb6f1935b 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/PushAggregationIntoTableScan.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/PushAggregationIntoTableScan.java
@@ -105,10 +105,6 @@ public class PushAggregationIntoTableScan implements
PlanOptimizer {
return node;
}
- if (tableScanNode.containsNonAlignedDevice()) {
- return node;
- }
-
PushDownLevel pushDownLevel =
calculatePushDownLevel(
node.getAggregations().values(),
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/DeviceIteratorScanOperatorTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/DeviceIteratorScanOperatorTest.java
index 951c2b895a1..082cf344485 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/DeviceIteratorScanOperatorTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/DeviceIteratorScanOperatorTest.java
@@ -124,6 +124,11 @@ public class DeviceIteratorScanOperatorTest {
DeviceIteratorScanOperator.DeviceChildOperatorTreeGenerator generator =
new DeviceIteratorScanOperator.DeviceChildOperatorTreeGenerator() {
+ @Override
+ public long ramBytesUsed() {
+ return 0L;
+ }
+
private Operator currentDeviceRootOperator;
@Override
@@ -132,7 +137,8 @@ public class DeviceIteratorScanOperatorTest {
}
@Override
- public void generateCurrentDeviceOperatorTree(DeviceEntry
deviceEntry) {
+ public void generateCurrentDeviceOperatorTree(
+ DeviceEntry deviceEntry, boolean needAdaptor) {
AlignedFullPath alignedPath =
new AlignedFullPath(
deviceEntry.getDeviceID(),
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/TreeViewTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/TreeViewTest.java
index 351bcb7f605..1dcd7d5487a 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/TreeViewTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/TreeViewTest.java
@@ -140,7 +140,7 @@ public class TreeViewTest {
public void aggregationQueryTest() {
PlanTester planTester = new PlanTester();
- // has non-aligned DeviceEntry, no push-down
+ // has non-aligned DeviceEntry
LogicalQueryPlan logicalQueryPlan =
planTester.createPlan(
"select tag1, count(s1) from "
@@ -149,14 +149,83 @@ public class TreeViewTest {
PlanMatchPattern expectedPlanPattern =
output(
aggregation(
- ImmutableMap.of("count", aggregationFunction("count",
ImmutableList.of("s1"))),
- treeDeviceViewTableScan(
+ ImmutableMap.of("count", aggregationFunction("count",
ImmutableList.of("count_0"))),
+ aggregationTreeDeviceViewTableScan(
+ singleGroupingSet("tag1"),
+ ImmutableList.of("tag1"),
+ Optional.empty(),
+ PARTIAL,
DEFAULT_TREE_DEVICE_VIEW_TABLE_FULL_NAME,
- ImmutableList.of("tag1", "s1"),
+ ImmutableList.of("tag1", "count_0"),
ImmutableSet.of("tag1", "s1"))));
assertPlan(logicalQueryPlan, expectedPlanPattern);
- // only aligned DeviceEntry, do push-down
+ assertPlan(
+ planTester.getFragmentPlan(0),
+ output(
+ aggregation(
+ ImmutableMap.of("count", aggregationFunction("count",
ImmutableList.of("count_1"))),
+ FINAL,
+ mergeSort(exchange(), exchange(), exchange(), exchange()))));
+
+ assertPlan(
+ planTester.getFragmentPlan(1),
+ aggregation(
+ ImmutableMap.of("count", aggregationFunction("count",
ImmutableList.of("count_0"))),
+ INTERMEDIATE,
+ aggregationTreeDeviceViewTableScan(
+ singleGroupingSet("tag1"),
+ ImmutableList.of("tag1"),
+ Optional.empty(),
+ PARTIAL,
+ DEFAULT_TREE_DEVICE_VIEW_TABLE_FULL_NAME,
+ ImmutableList.of("tag1", "count_0"),
+ ImmutableSet.of("tag1", "s1"),
+ true)));
+ assertPlan(
+ planTester.getFragmentPlan(2),
+ aggregation(
+ ImmutableMap.of("count", aggregationFunction("count",
ImmutableList.of("count_0"))),
+ INTERMEDIATE,
+ aggregationTreeDeviceViewTableScan(
+ singleGroupingSet("tag1"),
+ ImmutableList.of("tag1"),
+ Optional.empty(),
+ PARTIAL,
+ DEFAULT_TREE_DEVICE_VIEW_TABLE_FULL_NAME,
+ ImmutableList.of("tag1", "count_0"),
+ ImmutableSet.of("tag1", "s1"),
+ false)));
+ assertPlan(
+ planTester.getFragmentPlan(3),
+ aggregation(
+ ImmutableMap.of("count", aggregationFunction("count",
ImmutableList.of("count_0"))),
+ INTERMEDIATE,
+ aggregationTreeDeviceViewTableScan(
+ singleGroupingSet("tag1"),
+ ImmutableList.of("tag1"),
+ Optional.empty(),
+ PARTIAL,
+ DEFAULT_TREE_DEVICE_VIEW_TABLE_FULL_NAME,
+ ImmutableList.of("tag1", "count_0"),
+ ImmutableSet.of("tag1", "s1"),
+ true)));
+ assertPlan(
+ planTester.getFragmentPlan(4),
+ aggregation(
+ ImmutableMap.of("count", aggregationFunction("count",
ImmutableList.of("count_0"))),
+ INTERMEDIATE,
+ aggregationTreeDeviceViewTableScan(
+ singleGroupingSet("tag1"),
+ ImmutableList.of("tag1"),
+ Optional.empty(),
+ PARTIAL,
+ DEFAULT_TREE_DEVICE_VIEW_TABLE_FULL_NAME,
+ ImmutableList.of("tag1", "count_0"),
+ ImmutableSet.of("tag1", "s1"),
+ false)));
+
+ // only aligned DeviceEntry
logicalQueryPlan =
planTester.createPlan(
"select tag1, count(s1) from "
@@ -199,7 +268,8 @@ public class TreeViewTest {
PARTIAL,
DEFAULT_TREE_DEVICE_VIEW_TABLE_FULL_NAME,
ImmutableList.of("tag1", "count_0"),
- ImmutableSet.of("tag1", "s1"))));
+ ImmutableSet.of("tag1", "s1"),
+ true)));
}
}
}
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/planner/assertions/PlanMatchPattern.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/planner/assertions/PlanMatchPattern.java
index 03f79fd2ec2..2ca57e5296a 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/planner/assertions/PlanMatchPattern.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/planner/assertions/PlanMatchPattern.java
@@ -29,6 +29,7 @@ import
org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.GroupRe
import
org.apache.iotdb.db.queryengine.plan.relational.planner.node.AggregationNode;
import
org.apache.iotdb.db.queryengine.plan.relational.planner.node.AggregationTableScanNode;
import
org.apache.iotdb.db.queryengine.plan.relational.planner.node.AggregationTreeDeviceViewScanNode;
+import
org.apache.iotdb.db.queryengine.plan.relational.planner.node.AlignedAggregationTreeDeviceViewScanNode;
import
org.apache.iotdb.db.queryengine.plan.relational.planner.node.AssignUniqueId;
import
org.apache.iotdb.db.queryengine.plan.relational.planner.node.CollectNode;
import
org.apache.iotdb.db.queryengine.plan.relational.planner.node.CteScanNode;
@@ -43,6 +44,7 @@ import
org.apache.iotdb.db.queryengine.plan.relational.planner.node.JoinNode;
import org.apache.iotdb.db.queryengine.plan.relational.planner.node.LimitNode;
import
org.apache.iotdb.db.queryengine.plan.relational.planner.node.MarkDistinctNode;
import
org.apache.iotdb.db.queryengine.plan.relational.planner.node.MergeSortNode;
+import
org.apache.iotdb.db.queryengine.plan.relational.planner.node.NonAlignedAggregationTreeDeviceViewScanNode;
import org.apache.iotdb.db.queryengine.plan.relational.planner.node.OffsetNode;
import org.apache.iotdb.db.queryengine.plan.relational.planner.node.OutputNode;
import
org.apache.iotdb.db.queryengine.plan.relational.planner.node.ProjectNode;
@@ -429,6 +431,38 @@ public final class PlanMatchPattern {
return result;
}
+ public static PlanMatchPattern aggregationTreeDeviceViewTableScan(
+ GroupingSetDescriptor groupingSets,
+ List<String> preGroupedSymbols,
+ Optional<Symbol> groupId,
+ AggregationNode.Step step,
+ String expectedTableName,
+ List<String> outputSymbols,
+ Set<String> assignmentsKeys,
+ boolean aligned) {
+ PlanMatchPattern result =
+ aligned
+ ? node(AlignedAggregationTreeDeviceViewScanNode.class)
+ : node(NonAlignedAggregationTreeDeviceViewScanNode.class);
+
+ result.with(
+ new AggregationDeviceTableScanMatcher(
+ groupingSets,
+ preGroupedSymbols,
+ ImmutableList.of(),
+ groupId,
+ step,
+ expectedTableName,
+ Optional.empty(),
+ outputSymbols,
+ assignmentsKeys));
+
+ outputSymbols.forEach(
+ outputSymbol ->
+ result.withAlias(outputSymbol, new
ColumnReference(expectedTableName, outputSymbol)));
+ return result;
+ }
+
// Attention: Now we only pass aliases according to outputSymbols, but we
don't verify the output
// column if exists in Table and their order because there maybe partial
Agg-result.
public static PlanMatchPattern aggregationTableScan(