This is an automated email from the ASF dual-hosted git repository. caogaofei pushed a commit to branch agg_table_scan in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit d4defa7e0a8038b77187c1f0639c0c6275b2a2d1 Author: Beyyes <[email protected]> AuthorDate: Thu Oct 10 12:29:53 2024 +0800 add basic agg table scan impl --- .../TableAggregationTableScanOperator.java | 33 +++++++++-- .../plan/planner/TableOperatorGenerator.java | 67 +++++++++++++--------- .../TableModelTypeProviderExtractor.java | 16 +++++- 3 files changed, 82 insertions(+), 34 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/TableAggregationTableScanOperator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/TableAggregationTableScanOperator.java index 2793a81325c..6829c9c0fa6 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/TableAggregationTableScanOperator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/TableAggregationTableScanOperator.java @@ -44,6 +44,7 @@ import org.apache.tsfile.read.common.TimeRange; import org.apache.tsfile.read.common.block.TsBlock; import org.apache.tsfile.read.common.block.TsBlockBuilder; import org.apache.tsfile.read.common.block.column.LongColumn; +import org.apache.tsfile.read.common.block.column.RunLengthEncodedColumn; import org.apache.tsfile.utils.Pair; import org.apache.tsfile.write.schema.IMeasurementSchema; @@ -52,6 +53,7 @@ import java.util.List; import java.util.Optional; import java.util.stream.Collectors; +import static java.lang.String.format; import static org.apache.iotdb.db.queryengine.execution.operator.AggregationUtil.appendAggregationResult; import static org.apache.iotdb.db.queryengine.execution.operator.AggregationUtil.calculateAggregationFromRawData; import static org.apache.iotdb.db.queryengine.execution.operator.AggregationUtil.isAllAggregatorsHasFinalResult; @@ -161,13 +163,14 @@ public class TableAggregationTableScanOperator extends AbstractSeriesAggregation @Override public TsBlock next() throws Exception { // start stopwatch, reset leftRuntimeOfOneNextCall + + // TODO add maxRunTime optimization long start = System.nanoTime(); // leftRuntimeOfOneNextCall = operatorContext.getMaxRunTime().roundTo(TimeUnit.NANOSECONDS); // long maxRuntime = leftRuntimeOfOneNextCall; - - while ( // System.nanoTime() - start < maxRuntime&& - (curTimeRange != null || timeRangeIterator.hasNextTimeRange()) + + while ((curTimeRange != null || timeRangeIterator.hasNextTimeRange()) && !resultTsBlockBuilder.isFull()) { if (curTimeRange == null) { // move to the next time window @@ -181,14 +184,33 @@ public class TableAggregationTableScanOperator extends AbstractSeriesAggregation // calculate aggregation result on current time window // Keep curTimeRange if the calculation of this timeRange is not done if (calculateAggregationResultForCurrentTimeRange()) { + updateResultTsBlock(); curTimeRange = null; } } if (resultTsBlockBuilder.getPositionCount() > 0) { - TsBlock resultTsBlock = resultTsBlockBuilder.build(); + int declaredPositions = resultTsBlockBuilder.getPositionCount(); + ColumnBuilder[] valueColumnBuilders = resultTsBlockBuilder.getValueColumnBuilders(); + Column[] valueColumns = new Column[valueColumnBuilders.length]; + for (int i = 0; i < valueColumns.length; i++) { + valueColumns[i] = valueColumnBuilders[i].build(); + if (valueColumns[i].getPositionCount() != declaredPositions) { + throw new IllegalStateException( + format( + "Declared positions (%s) does not match column %s's number of entries (%s)", + declaredPositions, i, valueColumns[i].getPositionCount())); + } + } + + this.resultTsBlock = + new TsBlock( + resultTsBlockBuilder.getPositionCount(), + new RunLengthEncodedColumn( + TIME_COLUMN_TEMPLATE, resultTsBlockBuilder.getPositionCount()), + valueColumns); resultTsBlockBuilder.reset(); - return resultTsBlock; + return this.resultTsBlock; } else { return null; } @@ -254,7 +276,6 @@ public class TableAggregationTableScanOperator extends AbstractSeriesAggregation } else { currentDeviceIndex++; } - updateResultTsBlock(); if (currentDeviceIndex < deviceCount) { // construct AlignedSeriesScanUtil for next device this.seriesScanUtil = constructAlignedSeriesScanUtil(deviceEntries.get(currentDeviceIndex)); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java index c4e1bc9ee8a..43a4defd960 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java @@ -109,6 +109,7 @@ import javax.validation.constraints.NotNull; import java.io.File; import java.util.ArrayList; +import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; @@ -963,7 +964,8 @@ public class TableOperatorGenerator extends PlanVisitor<Operator, LocalExecution return new AggregationOperator(context, child, aggregatorBuilder.build()); } - private ImmutableMap<Symbol, Integer> makeLayoutFromOutputSymbols(List<Symbol> outputSymbols) { + private ImmutableMap<Symbol, Integer> makeLayoutFromOutputSymbols( + Collection<Symbol> outputSymbols) { ImmutableMap.Builder<Symbol, Integer> outputMappings = ImmutableMap.builder(); int channel = 0; for (Symbol symbol : outputSymbols) { @@ -1007,7 +1009,8 @@ public class TableOperatorGenerator extends PlanVisitor<Operator, LocalExecution return new TableAggregator( accumulator, step, - getTSDataType(typeProvider.getTableModelType(aggregationSymbol)), + getTSDataType(aggregation.getResolvedFunction().getSignature().getReturnType()), + // getTSDataType(typeProvider.getTableModelType(aggregationSymbol)), argumentChannels, OptionalInt.empty()); } @@ -1025,8 +1028,8 @@ public class TableOperatorGenerator extends PlanVisitor<Operator, LocalExecution List<TableAggregator> aggregators = new ArrayList<>(); - // TODO fix childLayout - Map<Symbol, Integer> childLayout = new HashMap<>(); + // TODO how to use the output symbols of AggregationTableScan? + Map<Symbol, Integer> childLayout = makeLayoutFromOutputSymbols(node.getAssignments().keySet()); for (Map.Entry<Symbol, AggregationNode.Aggregation> entry : node.getAggregations().entrySet()) { TableAggregator aggregator = @@ -1039,7 +1042,7 @@ public class TableOperatorGenerator extends PlanVisitor<Operator, LocalExecution aggregators.add(aggregator); } - List<Symbol> outputColumnNames = node.getOutputSymbols(); + Collection<Symbol> outputColumnNames = node.getAssignments().keySet(); int outputColumnCount = outputColumnNames.size(); List<ColumnSchema> columnSchemas = new ArrayList<>(outputColumnCount); int[] columnsIndexArray = new int[outputColumnCount]; @@ -1109,28 +1112,40 @@ public class TableOperatorGenerator extends PlanVisitor<Operator, LocalExecution ITimeRangeIterator timeRangeIterator = new SingleTimeWindowIterator(Long.MIN_VALUE, Long.MAX_VALUE); - return new TableAggregationTableScanOperator( - node.getPlanNodeId(), - operatorContext, - columnSchemas, - columnsIndexArray, - measurementColumnCount, - node.getDeviceEntries(), - node.getScanOrder(), - scanOptionsBuilder.build(), - measurementColumnNames, - measurementSchemas, - TSFileDescriptor.getInstance().getConfig().getMaxTsBlockLineNumber(), - // TODO if it equals subSensor variable - measurementColumnCount, - aggregators, - timeRangeIterator, - false, - null, - calculateMaxAggregationResultSize(), - true); + TableAggregationTableScanOperator aggTableScanOperator = + new TableAggregationTableScanOperator( + node.getPlanNodeId(), + operatorContext, + columnSchemas, + columnsIndexArray, + measurementColumnCount, + node.getDeviceEntries(), + node.getScanOrder(), + scanOptionsBuilder.build(), + measurementColumnNames, + measurementSchemas, + TSFileDescriptor.getInstance().getConfig().getMaxTsBlockLineNumber(), + // TODO if it equals subSensor variable + measurementColumnCount, + aggregators, + timeRangeIterator, + false, + null, + calculateMaxAggregationResultSize(), + true); + + ((DataDriverContext) context.getDriverContext()).addSourceOperator(aggTableScanOperator); + + for (int i = 0, size = node.getDeviceEntries().size(); i < size; i++) { + AlignedFullPath alignedPath = + constructAlignedPath( + node.getDeviceEntries().get(i), measurementColumnNames, measurementSchemas); + ((DataDriverContext) context.getDriverContext()).addPath(alignedPath); + } + + context.getDriverContext().setInputDriver(true); - // throw new UnsupportedOperationException("Agg-BE not supported"); + return aggTableScanOperator; } public static long calculateMaxAggregationResultSize( diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableModelTypeProviderExtractor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableModelTypeProviderExtractor.java index a78f8695b07..cabcdffd869 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableModelTypeProviderExtractor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableModelTypeProviderExtractor.java @@ -25,6 +25,7 @@ import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.ExchangeNo import org.apache.iotdb.db.queryengine.plan.planner.plan.node.sink.IdentitySinkNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.Symbol; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.AggregationNode; +import org.apache.iotdb.db.queryengine.plan.relational.planner.node.AggregationTableScanNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.CollectNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.FilterNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.JoinNode; @@ -73,6 +74,12 @@ public class TableModelTypeProviderExtractor { @Override public Void visitPlan(PlanNode node, Void context) { + addOutputSymbolsToTypeProvider(node); + node.getChildren().forEach(child -> child.accept(this, context)); + return null; + } + + private void addOutputSymbolsToTypeProvider(PlanNode node) { for (Symbol symbol : node.getOutputSymbols()) { if (!feTypeProvider.isSymbolExist(symbol)) { throw new IllegalStateException( @@ -82,8 +89,6 @@ public class TableModelTypeProviderExtractor { } beTypeProvider.putTableModelType(symbol, feTypeProvider.getTableModelType(symbol)); } - node.getChildren().forEach(child -> child.accept(this, context)); - return null; } @Override @@ -102,6 +107,13 @@ public class TableModelTypeProviderExtractor { return null; } + @Override + public Void visitAggregationTableScan(AggregationTableScanNode node, Void context) { + addOutputSymbolsToTypeProvider(node); + node.getAssignments().forEach((k, v) -> beTypeProvider.putTableModelType(k, v.getType())); + return null; + } + @Override public Void visitTableScan(TableScanNode node, Void context) { node.getAssignments().forEach((k, v) -> beTypeProvider.putTableModelType(k, v.getType()));
