This is an automated email from the ASF dual-hosted git repository. hui pushed a commit to branch lmh/AggregationPushDown in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit edb5a4950ad3a163cfe378b0cb1a5aef2c5526b8 Author: liuminghui233 <[email protected]> AuthorDate: Tue Apr 23 23:56:10 2024 +0800 finish --- .../plan/optimization/AggregationPushDown.java | 303 ++++++++++++++++ .../plan/planner/LogicalPlanBuilder.java | 397 +-------------------- .../plan/planner/LogicalPlanVisitor.java | 181 +++------- .../queryengine/plan/planner/LogicalPlanner.java | 6 +- .../plan/planner/OperatorTreeGenerator.java | 248 +++++++------ .../planner/distribution/ExchangeNodeAdder.java | 6 + .../plan/planner/plan/node/PlanNodeType.java | 5 + .../plan/planner/plan/node/PlanVisitor.java | 5 + .../plan/node/process/RawDataAggregationNode.java | 322 +++++++++++++++++ 9 files changed, 828 insertions(+), 645 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/optimization/AggregationPushDown.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/optimization/AggregationPushDown.java new file mode 100644 index 00000000000..77d13471a95 --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/optimization/AggregationPushDown.java @@ -0,0 +1,303 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.queryengine.plan.optimization; + +import org.apache.iotdb.common.rpc.thrift.TAggregationType; +import org.apache.iotdb.commons.path.AlignedPath; +import org.apache.iotdb.commons.path.MeasurementPath; +import org.apache.iotdb.commons.path.PartialPath; +import org.apache.iotdb.commons.udf.builtin.BuiltinAggregationFunction; +import org.apache.iotdb.db.queryengine.common.MPPQueryContext; +import org.apache.iotdb.db.queryengine.common.QueryId; +import org.apache.iotdb.db.queryengine.plan.analyze.Analysis; +import org.apache.iotdb.db.queryengine.plan.expression.Expression; +import org.apache.iotdb.db.queryengine.plan.expression.leaf.TimeSeriesOperand; +import org.apache.iotdb.db.queryengine.plan.expression.multi.FunctionExpression; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanVisitor; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.MultiChildProcessNode; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.ProjectNode; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.RawDataAggregationNode; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.SingleChildProcessNode; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.join.FullOuterTimeJoinNode; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.AlignedSeriesAggregationScanNode; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.SeriesAggregationScanNode; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.SeriesAggregationSourceNode; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.SeriesScanSourceNode; +import org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.AggregationDescriptor; +import org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.GroupByTimeParameter; +import org.apache.iotdb.db.queryengine.plan.statement.StatementType; +import org.apache.iotdb.db.queryengine.plan.statement.component.Ordering; +import org.apache.iotdb.db.queryengine.plan.statement.crud.QueryStatement; +import org.apache.iotdb.db.schemaengine.schemaregion.utils.MetaUtils; +import org.apache.iotdb.db.utils.SchemaUtils; + +import org.apache.commons.lang3.StringUtils; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import static com.google.common.base.Preconditions.checkArgument; +import static org.apache.iotdb.db.utils.constant.SqlConstant.COUNT_TIME; + +public class AggregationPushDown implements PlanOptimizer { + + @Override + public PlanNode optimize(PlanNode plan, Analysis analysis, MPPQueryContext context) { + if (analysis.getStatement().getType() != StatementType.QUERY) { + return plan; + } + QueryStatement queryStatement = analysis.getQueryStatement(); + if (!queryStatement.isAggregationQuery() + || (queryStatement.isGroupBy() && !queryStatement.isGroupByTime()) + || cannotUseStatistics(queryStatement, analysis)) { + return plan; + } + return plan.accept( + new Rewriter(), new RewriterContext(analysis, context, queryStatement.isAlignByDevice())); + } + + private boolean cannotUseStatistics(QueryStatement queryStatement, Analysis analysis) { + boolean isAlignByDevice = queryStatement.isAlignByDevice(); + if (isAlignByDevice) { + // check any of the devices + String device = analysis.getDeviceList().get(0).getDevice(); + return cannotUseStatistics( + analysis.getDeviceToAggregationExpressions().get(device), + analysis.getDeviceToSourceTransformExpressions().get(device)); + } else { + return cannotUseStatistics( + analysis.getAggregationExpressions(), analysis.getSourceTransformExpressions()); + } + } + + private boolean cannotUseStatistics( + Set<Expression> aggregationExpressions, Set<Expression> sourceTransformExpressions) { + for (Expression expression : aggregationExpressions) { + + if (expression instanceof FunctionExpression) { + FunctionExpression functionExpression = (FunctionExpression) expression; + // Disable statistics optimization of UDAF for now + if (functionExpression.isExternalAggregationFunctionExpression()) { + return true; + } + + if (COUNT_TIME.equalsIgnoreCase(functionExpression.getFunctionName())) { + String alignedDeviceId = ""; + for (Expression countTimeExpression : sourceTransformExpressions) { + TimeSeriesOperand ts = (TimeSeriesOperand) countTimeExpression; + if (!(ts.getPath() instanceof AlignedPath + || ((MeasurementPath) ts.getPath()).isUnderAlignedEntity())) { + return true; + } + if (StringUtils.isEmpty(alignedDeviceId)) { + alignedDeviceId = ts.getPath().getDevice(); + } else if (!alignedDeviceId.equalsIgnoreCase(ts.getPath().getDevice())) { + // count_time from only one aligned device can use AlignedSeriesAggScan + return true; + } + } + return false; + } + + if (!BuiltinAggregationFunction.canUseStatistics(functionExpression.getFunctionName())) { + return true; + } + } else { + throw new IllegalArgumentException( + String.format("Invalid Aggregation Expression: %s", expression.getExpressionString())); + } + } + return false; + } + + private static class Rewriter extends PlanVisitor<PlanNode, RewriterContext> { + + @Override + public PlanNode visitPlan(PlanNode node, RewriterContext context) { + throw new IllegalArgumentException("Unexpected plan node: " + node); + } + + @Override + public PlanNode visitSingleChildProcess(SingleChildProcessNode node, RewriterContext context) { + PlanNode rewrittenChild = node.getChild().accept(this, context); + node.setChild(rewrittenChild); + return node; + } + + @Override + public PlanNode visitMultiChildProcess(MultiChildProcessNode node, RewriterContext context) { + List<PlanNode> rewrittenChildren = new ArrayList<>(); + for (PlanNode child : node.getChildren()) { + rewrittenChildren.add(child.accept(this, context)); + } + node.setChildren(rewrittenChildren); + return node; + } + + @Override + public PlanNode visitRawDataAggregation(RawDataAggregationNode node, RewriterContext context) { + PlanNode child = node.getChild(); + if (child instanceof FullOuterTimeJoinNode || child instanceof SeriesScanSourceNode) { + if (child instanceof SeriesScanSourceNode) { + // only one source, check partition + } + + List<AggregationDescriptor> aggregationDescriptorList = node.getAggregationDescriptorList(); + + boolean needCheckAscending = node.getGroupByTimeParameter() == null; + Map<PartialPath, List<AggregationDescriptor>> sourceToAscendingAggregationsMap = + new HashMap<>(); + Map<PartialPath, List<AggregationDescriptor>> sourceToDescendingAggregationsMap = + new HashMap<>(); + Map<PartialPath, List<AggregationDescriptor>> sourceToCountTimeAggregationsMap = + new HashMap<>(); + for (AggregationDescriptor aggregationDescriptor : aggregationDescriptorList) { + checkArgument( + aggregationDescriptor.getInputExpressions().size() == 1 + && aggregationDescriptor.getInputExpressions().get(0) + instanceof TimeSeriesOperand); + PartialPath path = + ((TimeSeriesOperand) aggregationDescriptor.getInputExpressions().get(0)).getPath(); + if (aggregationDescriptor.getAggregationType().equals(TAggregationType.COUNT_TIME)) { + sourceToCountTimeAggregationsMap + .computeIfAbsent(path, key -> new ArrayList<>()) + .add(aggregationDescriptor); + } else if (SchemaUtils.isConsistentWithScanOrder( + aggregationDescriptor.getAggregationType(), node.getScanOrder())) { + sourceToAscendingAggregationsMap + .computeIfAbsent(path, key -> new ArrayList<>()) + .add(aggregationDescriptor); + } else { + sourceToDescendingAggregationsMap + .computeIfAbsent(path, key -> new ArrayList<>()) + .add(aggregationDescriptor); + } + } + + List<PlanNode> sourceNodeList = new ArrayList<>(); + Map<PartialPath, List<AggregationDescriptor>> groupedSourceToAscendingAggregations; + if (!sourceToCountTimeAggregationsMap.isEmpty()) { + groupedSourceToAscendingAggregations = sourceToCountTimeAggregationsMap; + } else { + groupedSourceToAscendingAggregations = + MetaUtils.groupAlignedAggregations(sourceToAscendingAggregationsMap); + } + for (Map.Entry<PartialPath, List<AggregationDescriptor>> sourceAggregationsEntry : + groupedSourceToAscendingAggregations.entrySet()) { + sourceNodeList.add( + createAggregationScanNode( + sourceAggregationsEntry.getKey(), + sourceAggregationsEntry.getValue(), + node.getScanOrder(), + node.getGroupByTimeParameter(), + context)); + } + if (needCheckAscending) { + Map<PartialPath, List<AggregationDescriptor>> groupedSourceToDescendingAggregations = + MetaUtils.groupAlignedAggregations(sourceToDescendingAggregationsMap); + for (Map.Entry<PartialPath, List<AggregationDescriptor>> sourceAggregationsEntry : + groupedSourceToDescendingAggregations.entrySet()) { + sourceNodeList.add( + createAggregationScanNode( + sourceAggregationsEntry.getKey(), + sourceAggregationsEntry.getValue(), + node.getScanOrder().reverse(), + null, + context)); + } + } + + PlanNode resultNode = convergeWithTimeJoin(sourceNodeList, node.getScanOrder(), context); + resultNode = planProject(resultNode, node.getChild(), context); + return resultNode; + } + // cannot push down + return node; + } + + private SeriesAggregationSourceNode createAggregationScanNode( + PartialPath selectPath, + List<AggregationDescriptor> aggregationDescriptorList, + Ordering scanOrder, + GroupByTimeParameter groupByTimeParameter, + RewriterContext context) { + if (selectPath instanceof MeasurementPath) { // non-aligned series + return new SeriesAggregationScanNode( + context.genPlanNodeId(), + (MeasurementPath) selectPath, + aggregationDescriptorList, + scanOrder, + groupByTimeParameter); + } else if (selectPath instanceof AlignedPath) { // aligned series + return new AlignedSeriesAggregationScanNode( + context.genPlanNodeId(), + (AlignedPath) selectPath, + aggregationDescriptorList, + scanOrder, + groupByTimeParameter); + } else { + throw new IllegalArgumentException("unexpected path type"); + } + } + + private PlanNode convergeWithTimeJoin( + List<PlanNode> sourceNodes, Ordering mergeOrder, RewriterContext context) { + PlanNode tmpNode; + if (sourceNodes.size() == 1) { + tmpNode = sourceNodes.get(0); + } else { + tmpNode = new FullOuterTimeJoinNode(context.genPlanNodeId(), mergeOrder, sourceNodes); + } + return tmpNode; + } + + private PlanNode planProject(PlanNode resultNode, PlanNode rawNode, RewriterContext context) { + if (context.isAlignByDevice() + && !rawNode.getOutputColumnNames().equals(resultNode.getOutputColumnNames())) { + return new ProjectNode(context.genPlanNodeId(), resultNode, rawNode.getOutputColumnNames()); + } + return resultNode; + } + } + + private static class RewriterContext { + + private final QueryId queryId; + private final boolean isAlignByDevice; + + public RewriterContext(Analysis analysis, MPPQueryContext context, boolean isAlignByDevice) { + this.queryId = context.getQueryId(); + this.isAlignByDevice = isAlignByDevice; + } + + public PlanNodeId genPlanNodeId() { + return queryId.genPlanNodeId(); + } + + public boolean isAlignByDevice() { + return isAlignByDevice; + } + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanBuilder.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanBuilder.java index 15b890cb2ec..1f4dbb34b07 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanBuilder.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanBuilder.java @@ -23,7 +23,6 @@ import org.apache.iotdb.common.rpc.thrift.TAggregationType; import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation; import org.apache.iotdb.common.rpc.thrift.TSchemaNode; import org.apache.iotdb.commons.exception.IllegalPathException; -import org.apache.iotdb.commons.exception.MetadataException; import org.apache.iotdb.commons.path.AlignedPath; import org.apache.iotdb.commons.path.MeasurementPath; import org.apache.iotdb.commons.path.PartialPath; @@ -56,19 +55,18 @@ import org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.read.Sche import org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.read.SchemaQueryOrderByHeatNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.read.TimeSeriesCountNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.read.TimeSeriesSchemaScanNode; -import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.AggregationNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.ColumnInjectNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.DeviceViewIntoNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.DeviceViewNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.FillNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.FilterNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.GroupByLevelNode; -import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.GroupByTagNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.IntoNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.LimitNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.MergeSortNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.MultiChildProcessNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.OffsetNode; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.RawDataAggregationNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.SingleDeviceViewNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.SlidingWindowAggregationNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.SortNode; @@ -78,11 +76,8 @@ import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.join.FullO import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.last.LastQueryNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.last.LastQueryTransformNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.AlignedLastQueryScanNode; -import org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.AlignedSeriesAggregationScanNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.AlignedSeriesScanNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.LastQueryScanNode; -import org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.SeriesAggregationScanNode; -import org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.SeriesAggregationSourceNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.SeriesScanNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.ShowQueriesNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.AggregationDescriptor; @@ -103,8 +98,6 @@ import org.apache.iotdb.db.schemaengine.template.Template; import org.apache.iotdb.db.utils.SchemaUtils; import org.apache.iotdb.db.utils.columngenerator.parameter.SlidingTimeColumnGeneratorParameter; import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; -import org.apache.iotdb.tsfile.utils.Pair; -import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema; import org.apache.commons.lang3.Validate; @@ -113,7 +106,6 @@ import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.Comparator; -import java.util.HashMap; import java.util.HashSet; import java.util.LinkedHashMap; import java.util.LinkedHashSet; @@ -123,14 +115,12 @@ import java.util.Set; import java.util.TreeMap; import java.util.stream.Collectors; -import static com.google.common.base.Preconditions.checkArgument; import static org.apache.iotdb.commons.conf.IoTDBConstant.MULTI_LEVEL_PATH_WILDCARD; import static org.apache.iotdb.db.queryengine.common.header.ColumnHeaderConstant.DEVICE; import static org.apache.iotdb.db.queryengine.common.header.ColumnHeaderConstant.ENDTIME; import static org.apache.iotdb.db.queryengine.plan.analyze.ExpressionTypeAnalyzer.analyzeExpression; import static org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.TopKNode.LIMIT_VALUE_USE_TOP_K; import static org.apache.iotdb.db.queryengine.plan.statement.component.FillPolicy.LINEAR; -import static org.apache.iotdb.db.utils.constant.SqlConstant.COUNT_TIME; import static org.apache.iotdb.db.utils.constant.SqlConstant.LAST_VALUE; import static org.apache.iotdb.db.utils.constant.SqlConstant.MAX_TIME; @@ -378,7 +368,7 @@ public class LogicalPlanBuilder { .planRawDataSource( sourceExpressions, Ordering.DESC, 0, 0, analysis.isLastLevelUseWildcard()) .planWhereAndSourceTransform(null, sourceTransformExpressions, false, Ordering.DESC) - .planAggregation( + .planRawDataAggregation( new LinkedHashSet<>(Arrays.asList(maxTimeAgg, lastValueAgg)), null, analysis.getGroupByTimeParameter(), @@ -397,304 +387,6 @@ public class LogicalPlanBuilder { } } - public LogicalPlanBuilder planAggregationSource( - AggregationStep curStep, - Ordering scanOrder, - GroupByTimeParameter groupByTimeParameter, - Set<Expression> aggregationExpressions, - Set<Expression> sourceTransformExpressions, - Map<Expression, Set<Expression>> crossGroupByAggregations, - List<String> tagKeys, - Map<List<String>, LinkedHashMap<Expression, List<Expression>>> - tagValuesToGroupedTimeseriesOperands) { - boolean needCheckAscending = groupByTimeParameter == null; - Map<PartialPath, List<AggregationDescriptor>> ascendingAggregations = new HashMap<>(); - Map<PartialPath, List<AggregationDescriptor>> descendingAggregations = new HashMap<>(); - Map<PartialPath, List<AggregationDescriptor>> countTimeAggregations = new HashMap<>(); - for (Expression aggregationExpression : aggregationExpressions) { - createAggregationDescriptor( - (FunctionExpression) aggregationExpression, - curStep, - scanOrder, - needCheckAscending, - ascendingAggregations, - descendingAggregations, - countTimeAggregations); - } - - List<PlanNode> sourceNodeList = - constructSourceNodeFromAggregationDescriptors( - ascendingAggregations, - descendingAggregations, - countTimeAggregations, - scanOrder, - groupByTimeParameter); - updateTypeProvider(aggregationExpressions); - updateTypeProvider(sourceTransformExpressions); - - return convergeAggregationSource( - sourceNodeList, - curStep, - scanOrder, - groupByTimeParameter, - aggregationExpressions, - crossGroupByAggregations, - tagKeys, - tagValuesToGroupedTimeseriesOperands); - } - - public LogicalPlanBuilder planAggregationSourceWithIndexAdjust( - AggregationStep curStep, - Ordering scanOrder, - GroupByTimeParameter groupByTimeParameter, - Set<Expression> aggregationExpressions, - Set<Expression> sourceTransformExpressions, - Map<Expression, Set<Expression>> crossGroupByExpressions, - List<Integer> deviceViewInputIndexes, - boolean outputEndTime) { - checkArgument( - aggregationExpressions.size() <= deviceViewInputIndexes.size(), - "Each aggregate should correspond to a column of output."); - - boolean needCheckAscending = groupByTimeParameter == null; - Map<PartialPath, List<AggregationDescriptor>> ascendingAggregations = new HashMap<>(); - Map<PartialPath, List<AggregationDescriptor>> descendingAggregations = new HashMap<>(); - Map<AggregationDescriptor, Integer> aggregationToIndexMap = new HashMap<>(); - Map<PartialPath, List<AggregationDescriptor>> countTimeAggregations = new HashMap<>(); - - // If need output endTime, the first index is used by __endTime - int index = outputEndTime ? 1 : 0; - for (Expression aggregationExpression : aggregationExpressions) { - AggregationDescriptor aggregationDescriptor = - createAggregationDescriptor( - (FunctionExpression) aggregationExpression, - curStep, - scanOrder, - needCheckAscending, - ascendingAggregations, - descendingAggregations, - countTimeAggregations); - aggregationToIndexMap.put(aggregationDescriptor, deviceViewInputIndexes.get(index)); - index++; - } - - List<PlanNode> sourceNodeList = - constructSourceNodeFromAggregationDescriptors( - ascendingAggregations, - descendingAggregations, - countTimeAggregations, - scanOrder, - groupByTimeParameter); - updateTypeProvider(aggregationExpressions); - updateTypeProvider(sourceTransformExpressions); - - if (!curStep.isOutputPartial()) { - // update measurementIndexes - deviceViewInputIndexes.clear(); - if (outputEndTime) { - deviceViewInputIndexes.add(1); - } - deviceViewInputIndexes.addAll( - sourceNodeList.stream() - .map( - planNode -> - ((SeriesAggregationSourceNode) planNode).getAggregationDescriptorList()) - .flatMap(List::stream) - .map(aggregationToIndexMap::get) - .collect(Collectors.toList())); - } - - return convergeAggregationSource( - sourceNodeList, - curStep, - scanOrder, - groupByTimeParameter, - aggregationExpressions, - crossGroupByExpressions, - null, - null); - } - - private AggregationDescriptor createAggregationDescriptor( - FunctionExpression sourceExpression, - AggregationStep curStep, - Ordering scanOrder, - boolean needCheckAscending, - Map<PartialPath, List<AggregationDescriptor>> ascendingAggregations, - Map<PartialPath, List<AggregationDescriptor>> descendingAggregations, - Map<PartialPath, List<AggregationDescriptor>> countTimeAggregations) { - AggregationDescriptor aggregationDescriptor = - new AggregationDescriptor( - sourceExpression.getFunctionName(), - curStep, - sourceExpression.getExpressions(), - sourceExpression.getFunctionAttributes()); - if (curStep.isOutputPartial()) { - updateTypeProviderByPartialAggregation(aggregationDescriptor, context.getTypeProvider()); - } - - if (COUNT_TIME.equalsIgnoreCase(sourceExpression.getFunctionName())) { - Map<String, Pair<List<String>, List<IMeasurementSchema>>> map = new HashMap<>(); - for (Expression expression : sourceExpression.getCountTimeExpressions()) { - TimeSeriesOperand ts = (TimeSeriesOperand) expression; - PartialPath path = ts.getPath(); - Pair<List<String>, List<IMeasurementSchema>> pair = - map.computeIfAbsent( - path.getDevice(), k -> new Pair<>(new ArrayList<>(), new ArrayList<>())); - pair.left.add(path.getMeasurement()); - try { - pair.right.add(path.getMeasurementSchema()); - } catch (MetadataException ex) { - throw new RuntimeException(ex); - } - } - - for (Map.Entry<String, Pair<List<String>, List<IMeasurementSchema>>> entry : map.entrySet()) { - String device = entry.getKey(); - Pair<List<String>, List<IMeasurementSchema>> pair = entry.getValue(); - AlignedPath alignedPath = null; - try { - alignedPath = new AlignedPath(device, pair.left, pair.right); - } catch (IllegalPathException e) { - throw new RuntimeException(e); - } - countTimeAggregations.put(alignedPath, Collections.singletonList(aggregationDescriptor)); - } - - return aggregationDescriptor; - } - - PartialPath selectPath = - ((TimeSeriesOperand) sourceExpression.getExpressions().get(0)).getPath(); - if (!needCheckAscending - || SchemaUtils.isConsistentWithScanOrder( - aggregationDescriptor.getAggregationType(), scanOrder)) { - ascendingAggregations - .computeIfAbsent(selectPath, key -> new ArrayList<>()) - .add(aggregationDescriptor); - } else { - descendingAggregations - .computeIfAbsent(selectPath, key -> new ArrayList<>()) - .add(aggregationDescriptor); - } - return aggregationDescriptor; - } - - private List<PlanNode> constructSourceNodeFromAggregationDescriptors( - Map<PartialPath, List<AggregationDescriptor>> ascendingAggregations, - Map<PartialPath, List<AggregationDescriptor>> descendingAggregations, - Map<PartialPath, List<AggregationDescriptor>> countTimeAggregations, - Ordering scanOrder, - GroupByTimeParameter groupByTimeParameter) { - - List<PlanNode> sourceNodeList = new ArrayList<>(); - boolean needCheckAscending = groupByTimeParameter == null; - Map<PartialPath, List<AggregationDescriptor>> groupedAscendingAggregations = null; - if (!countTimeAggregations.isEmpty()) { - groupedAscendingAggregations = countTimeAggregations; - } else { - groupedAscendingAggregations = MetaUtils.groupAlignedAggregations(ascendingAggregations); - } - - for (Map.Entry<PartialPath, List<AggregationDescriptor>> pathAggregationsEntry : - groupedAscendingAggregations.entrySet()) { - sourceNodeList.add( - createAggregationScanNode( - pathAggregationsEntry.getKey(), - pathAggregationsEntry.getValue(), - scanOrder, - groupByTimeParameter)); - } - - if (needCheckAscending) { - Map<PartialPath, List<AggregationDescriptor>> groupedDescendingAggregations = - MetaUtils.groupAlignedAggregations(descendingAggregations); - for (Map.Entry<PartialPath, List<AggregationDescriptor>> pathAggregationsEntry : - groupedDescendingAggregations.entrySet()) { - sourceNodeList.add( - createAggregationScanNode( - pathAggregationsEntry.getKey(), - pathAggregationsEntry.getValue(), - scanOrder.reverse(), - null)); - } - } - return sourceNodeList; - } - - private LogicalPlanBuilder convergeAggregationSource( - List<PlanNode> sourceNodeList, - AggregationStep curStep, - Ordering scanOrder, - GroupByTimeParameter groupByTimeParameter, - Set<Expression> aggregationExpressions, - Map<Expression, Set<Expression>> crossGroupByExpressions, - List<String> tagKeys, - Map<List<String>, LinkedHashMap<Expression, List<Expression>>> - tagValuesToGroupedTimeseriesOperands) { - if (curStep.isOutputPartial()) { - if (groupByTimeParameter != null && groupByTimeParameter.hasOverlap()) { - curStep = - crossGroupByExpressions != null ? AggregationStep.INTERMEDIATE : AggregationStep.FINAL; - - this.root = convergeWithTimeJoin(sourceNodeList, scanOrder); - - this.root = - createSlidingWindowAggregationNode( - this.getRoot(), aggregationExpressions, groupByTimeParameter, curStep, scanOrder); - - if (crossGroupByExpressions != null) { - curStep = AggregationStep.FINAL; - if (tagKeys != null) { - this.root = - createGroupByTagNode( - tagKeys, - tagValuesToGroupedTimeseriesOperands, - crossGroupByExpressions.keySet(), - Collections.singletonList(this.getRoot()), - curStep, - groupByTimeParameter, - scanOrder); - } else { - this.root = - createGroupByTLevelNode( - Collections.singletonList(this.getRoot()), - crossGroupByExpressions, - curStep, - groupByTimeParameter, - scanOrder); - } - } - } else { - if (tagKeys != null) { - curStep = AggregationStep.FINAL; - this.root = - createGroupByTagNode( - tagKeys, - tagValuesToGroupedTimeseriesOperands, - crossGroupByExpressions.keySet(), - sourceNodeList, - curStep, - groupByTimeParameter, - scanOrder); - } else if (crossGroupByExpressions != null) { - curStep = AggregationStep.FINAL; - this.root = - createGroupByTLevelNode( - sourceNodeList, - crossGroupByExpressions, - curStep, - groupByTimeParameter, - scanOrder); - } - } - } else { - this.root = convergeWithTimeJoin(sourceNodeList, scanOrder); - } - - return this; - } - public static void updateTypeProviderByPartialAggregation( AggregationDescriptor aggregationDescriptor, TypeProvider typeProvider) { if (aggregationDescriptor.getAggregationType() == TAggregationType.UDAF) { @@ -977,7 +669,7 @@ public class LogicalPlanBuilder { return this; } - public LogicalPlanBuilder planAggregation( + public LogicalPlanBuilder planRawDataAggregation( Set<Expression> aggregationExpressions, Expression groupByExpression, GroupByTimeParameter groupByTimeParameter, @@ -999,9 +691,9 @@ public class LogicalPlanBuilder { aggregationDescriptor, context.getTypeProvider())); } this.root = - new AggregationNode( + new RawDataAggregationNode( context.getQueryId().genPlanNodeId(), - Collections.singletonList(this.getRoot()), + this.getRoot(), aggregationDescriptorList, groupByTimeParameter, groupByParameter, @@ -1076,85 +768,6 @@ public class LogicalPlanBuilder { scanOrder); } - private PlanNode createGroupByTagNode( - List<String> tagKeys, - Map<List<String>, LinkedHashMap<Expression, List<Expression>>> - tagValuesToGroupedTimeseriesOperands, - Collection<Expression> groupByTagOutputExpressions, - List<PlanNode> children, - AggregationStep curStep, - GroupByTimeParameter groupByTimeParameter, - Ordering scanOrder) { - Map<List<String>, List<CrossSeriesAggregationDescriptor>> tagValuesToAggregationDescriptors = - new HashMap<>(); - for (List<String> tagValues : tagValuesToGroupedTimeseriesOperands.keySet()) { - LinkedHashMap<Expression, List<Expression>> groupedTimeseriesOperands = - tagValuesToGroupedTimeseriesOperands.get(tagValues); - List<CrossSeriesAggregationDescriptor> aggregationDescriptors = new ArrayList<>(); - - // Bind an AggregationDescriptor for each GroupByTagOutputExpression - for (Expression groupByTagOutputExpression : groupByTagOutputExpressions) { - boolean added = false; - for (Expression expression : groupedTimeseriesOperands.keySet()) { - if (expression.equals(groupByTagOutputExpression)) { - String functionName = ((FunctionExpression) expression).getFunctionName(); - CrossSeriesAggregationDescriptor aggregationDescriptor = - new CrossSeriesAggregationDescriptor( - functionName, - curStep, - groupedTimeseriesOperands.get(expression), - ((FunctionExpression) expression).getFunctionAttributes(), - expression.getExpressions()); - aggregationDescriptors.add(aggregationDescriptor); - added = true; - break; - } - } - if (!added) { - aggregationDescriptors.add(null); - } - } - tagValuesToAggregationDescriptors.put(tagValues, aggregationDescriptors); - } - - updateTypeProvider(groupByTagOutputExpressions); - updateTypeProviderWithConstantType(tagKeys, TSDataType.TEXT); - return new GroupByTagNode( - context.getQueryId().genPlanNodeId(), - children, - groupByTimeParameter, - scanOrder, - tagKeys, - tagValuesToAggregationDescriptors, - groupByTagOutputExpressions.stream() - .map(Expression::getExpressionString) - .collect(Collectors.toList())); - } - - private SeriesAggregationSourceNode createAggregationScanNode( - PartialPath selectPath, - List<AggregationDescriptor> aggregationDescriptorList, - Ordering scanOrder, - GroupByTimeParameter groupByTimeParameter) { - if (selectPath instanceof MeasurementPath) { // non-aligned series - return new SeriesAggregationScanNode( - context.getQueryId().genPlanNodeId(), - (MeasurementPath) selectPath, - aggregationDescriptorList, - scanOrder, - groupByTimeParameter); - } else if (selectPath instanceof AlignedPath) { // aligned series - return new AlignedSeriesAggregationScanNode( - context.getQueryId().genPlanNodeId(), - (AlignedPath) selectPath, - aggregationDescriptorList, - scanOrder, - groupByTimeParameter); - } else { - throw new IllegalArgumentException("unexpected path type"); - } - } - private List<AggregationDescriptor> constructAggregationDescriptorList( Set<Expression> aggregationExpressions, AggregationStep curStep) { return aggregationExpressions.stream() diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanVisitor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanVisitor.java index 87c04426010..2c371732f4f 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanVisitor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanVisitor.java @@ -18,18 +18,12 @@ */ package org.apache.iotdb.db.queryengine.plan.planner; -import org.apache.iotdb.commons.path.AlignedPath; -import org.apache.iotdb.commons.path.MeasurementPath; import org.apache.iotdb.commons.path.PartialPath; import org.apache.iotdb.commons.schema.view.viewExpression.ViewExpression; -import org.apache.iotdb.commons.udf.builtin.BuiltinAggregationFunction; import org.apache.iotdb.db.queryengine.common.MPPQueryContext; import org.apache.iotdb.db.queryengine.common.header.ColumnHeaderConstant; import org.apache.iotdb.db.queryengine.plan.analyze.Analysis; -import org.apache.iotdb.db.queryengine.plan.analyze.ExpressionAnalyzer; import org.apache.iotdb.db.queryengine.plan.expression.Expression; -import org.apache.iotdb.db.queryengine.plan.expression.leaf.TimeSeriesOperand; -import org.apache.iotdb.db.queryengine.plan.expression.multi.FunctionExpression; import org.apache.iotdb.db.queryengine.plan.expression.visitor.TransformToViewExpressionVisitor; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.ExplainAnalyzeNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode; @@ -95,8 +89,6 @@ import org.apache.iotdb.db.schemaengine.template.Template; import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; import org.apache.iotdb.tsfile.utils.Pair; -import org.apache.commons.lang3.StringUtils; - import java.util.ArrayList; import java.util.HashMap; import java.util.LinkedHashMap; @@ -105,7 +97,6 @@ import java.util.Map; import java.util.Set; import static org.apache.iotdb.db.queryengine.common.header.ColumnHeaderConstant.ENDTIME; -import static org.apache.iotdb.db.utils.constant.SqlConstant.COUNT_TIME; /** * This visitor is used to generate a logical plan for the statement and returns the {@link @@ -179,7 +170,6 @@ public class LogicalPlanVisitor extends StatementVisitor<PlanNode, MPPQueryConte analysis.getDeviceToGroupByExpression() != null ? analysis.getDeviceToGroupByExpression().get(deviceName) : null, - analysis.getDeviceViewInputIndexesMap().get(deviceName), context)); // order by device, expression, push down sortOperator if (queryStatement.needPushDownSort()) { @@ -210,7 +200,6 @@ public class LogicalPlanVisitor extends StatementVisitor<PlanNode, MPPQueryConte analysis.getWhereExpression(), analysis.getAggregationExpressions(), analysis.getGroupByExpression(), - null, context)); } @@ -255,7 +244,6 @@ public class LogicalPlanVisitor extends StatementVisitor<PlanNode, MPPQueryConte Expression whereExpression, Set<Expression> aggregationExpressions, Expression groupByExpression, - List<Integer> deviceViewInputIndexes, MPPQueryContext context) { LogicalPlanBuilder planBuilder = new LogicalPlanBuilder(analysis, context); if (aggregationExpressions == null) { @@ -275,96 +263,56 @@ public class LogicalPlanVisitor extends StatementVisitor<PlanNode, MPPQueryConte queryStatement.getResultTimeOrder()); } else { // aggregation query - boolean isRawDataSource = - analysis.hasValueFilter() - || analysis.hasGroupByParameter() - || needTransform(sourceTransformExpressions) - || cannotUseStatistics(aggregationExpressions, sourceTransformExpressions); - if (queryStatement.isOutputEndTime()) { - context.getTypeProvider().setType(ENDTIME, TSDataType.INT64); - } - AggregationStep curStep; - if (isRawDataSource) { - planBuilder = - planBuilder - .planRawDataSource( - sourceExpressions, - queryStatement.getResultTimeOrder(), - 0, - 0, - analysis.isLastLevelUseWildcard()) - .planWhereAndSourceTransform( - whereExpression, - sourceTransformExpressions, - queryStatement.isGroupByTime(), - queryStatement.getResultTimeOrder()); - - boolean outputPartial = - queryStatement.isGroupByLevel() - || (queryStatement.isGroupByTime() - && analysis.getGroupByTimeParameter().hasOverlap()); - curStep = outputPartial ? AggregationStep.PARTIAL : AggregationStep.SINGLE; + planBuilder = + planBuilder + .planRawDataSource( + sourceExpressions, + queryStatement.getResultTimeOrder(), + 0, + 0, + analysis.isLastLevelUseWildcard()) + .planWhereAndSourceTransform( + whereExpression, + sourceTransformExpressions, + queryStatement.isGroupByTime(), + queryStatement.getResultTimeOrder()); + + boolean outputPartial = + queryStatement.isGroupByLevel() + || (queryStatement.isGroupByTime() + && analysis.getGroupByTimeParameter().hasOverlap()); + AggregationStep curStep = outputPartial ? AggregationStep.PARTIAL : AggregationStep.SINGLE; + planBuilder = + planBuilder.planRawDataAggregation( + aggregationExpressions, + groupByExpression, + analysis.getGroupByTimeParameter(), + analysis.getGroupByParameter(), + queryStatement.isOutputEndTime(), + curStep, + queryStatement.getResultTimeOrder()); + + if (queryStatement.isGroupByTime() && analysis.getGroupByTimeParameter().hasOverlap()) { + curStep = + queryStatement.isGroupByLevel() ? AggregationStep.INTERMEDIATE : AggregationStep.FINAL; planBuilder = - planBuilder.planAggregation( + planBuilder.planSlidingWindowAggregation( aggregationExpressions, - groupByExpression, analysis.getGroupByTimeParameter(), - analysis.getGroupByParameter(), - queryStatement.isOutputEndTime(), curStep, queryStatement.getResultTimeOrder()); + } - if (queryStatement.isGroupByTime() && analysis.getGroupByTimeParameter().hasOverlap()) { - curStep = - queryStatement.isGroupByLevel() - ? AggregationStep.INTERMEDIATE - : AggregationStep.FINAL; - planBuilder = - planBuilder.planSlidingWindowAggregation( - aggregationExpressions, - analysis.getGroupByTimeParameter(), - curStep, - queryStatement.getResultTimeOrder()); - } - - if (queryStatement.isGroupByLevel()) { - planBuilder = - planBuilder.planGroupByLevel( - analysis.getCrossGroupByExpressions(), - analysis.getGroupByTimeParameter(), - queryStatement.getResultTimeOrder()); - } - } else { - curStep = - (analysis.getCrossGroupByExpressions() != null - || (analysis.getGroupByTimeParameter() != null - && analysis.getGroupByTimeParameter().hasOverlap())) - ? AggregationStep.PARTIAL - : AggregationStep.SINGLE; - + if (queryStatement.isGroupByLevel()) { planBuilder = - deviceViewInputIndexes == null - ? planBuilder.planAggregationSource( - curStep, - queryStatement.getResultTimeOrder(), - analysis.getGroupByTimeParameter(), - aggregationExpressions, - sourceTransformExpressions, - analysis.getCrossGroupByExpressions(), - analysis.getTagKeys(), - analysis.getTagValuesToGroupedTimeseriesOperands()) - : planBuilder.planAggregationSourceWithIndexAdjust( - curStep, - queryStatement.getResultTimeOrder(), - analysis.getGroupByTimeParameter(), - aggregationExpressions, - sourceTransformExpressions, - analysis.getCrossGroupByExpressions(), - deviceViewInputIndexes, - queryStatement.isOutputEndTime()); + planBuilder.planGroupByLevel( + analysis.getCrossGroupByExpressions(), + analysis.getGroupByTimeParameter(), + queryStatement.getResultTimeOrder()); } if (queryStatement.isGroupByTime() && queryStatement.isOutputEndTime()) { + context.getTypeProvider().setType(ENDTIME, TSDataType.INT64); planBuilder = planBuilder.planEndTimeColumnInject( analysis.getGroupByTimeParameter(), @@ -395,55 +343,6 @@ public class LogicalPlanVisitor extends StatementVisitor<PlanNode, MPPQueryConte return 0; } - private boolean needTransform(Set<Expression> expressions) { - for (Expression expression : expressions) { - if (ExpressionAnalyzer.checkIsNeedTransform(expression)) { - return true; - } - } - return false; - } - - private boolean cannotUseStatistics( - Set<Expression> expressions, Set<Expression> sourceTransformExpressions) { - for (Expression expression : expressions) { - - if (expression instanceof FunctionExpression) { - FunctionExpression functionExpression = (FunctionExpression) expression; - // Disable statistics optimization of UDAF for now - if (functionExpression.isExternalAggregationFunctionExpression()) { - return true; - } - - if (COUNT_TIME.equalsIgnoreCase(functionExpression.getFunctionName())) { - String alignedDeviceId = ""; - for (Expression countTimeExpression : sourceTransformExpressions) { - TimeSeriesOperand ts = (TimeSeriesOperand) countTimeExpression; - if (!(ts.getPath() instanceof AlignedPath - || ((MeasurementPath) ts.getPath()).isUnderAlignedEntity())) { - return true; - } - if (StringUtils.isEmpty(alignedDeviceId)) { - alignedDeviceId = ts.getPath().getDevice(); - } else if (!alignedDeviceId.equalsIgnoreCase(ts.getPath().getDevice())) { - // count_time from only one aligned device can use AlignedSeriesAggScan - return true; - } - } - return false; - } - - if (!BuiltinAggregationFunction.canUseStatistics(functionExpression.getFunctionName())) { - return true; - } - } else { - throw new IllegalArgumentException( - String.format("Invalid Aggregation Expression: %s", expression.getExpressionString())); - } - } - return false; - } - @Override public PlanNode visitCreateTimeseries( CreateTimeSeriesStatement createTimeSeriesStatement, MPPQueryContext context) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanner.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanner.java index 78d939f019f..5f460f0267d 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanner.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanner.java @@ -21,12 +21,13 @@ package org.apache.iotdb.db.queryengine.plan.planner; import org.apache.iotdb.db.queryengine.common.MPPQueryContext; import org.apache.iotdb.db.queryengine.metric.QueryPlanCostMetricSet; import org.apache.iotdb.db.queryengine.plan.analyze.Analysis; +import org.apache.iotdb.db.queryengine.plan.optimization.AggregationPushDown; import org.apache.iotdb.db.queryengine.plan.optimization.PlanOptimizer; import org.apache.iotdb.db.queryengine.plan.optimization.PredicatePushDown; import org.apache.iotdb.db.queryengine.plan.planner.plan.LogicalQueryPlan; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode; -import java.util.Collections; +import java.util.Arrays; import java.util.List; import static org.apache.iotdb.db.queryengine.metric.QueryPlanCostMetricSet.LOGICAL_PLANNER; @@ -35,7 +36,8 @@ import static org.apache.iotdb.db.queryengine.metric.QueryPlanCostMetricSet.LOGI public class LogicalPlanner { private final MPPQueryContext context; - private final List<PlanOptimizer> optimizers = Collections.singletonList(new PredicatePushDown()); + private final List<PlanOptimizer> optimizers = + Arrays.asList(new PredicatePushDown(), new AggregationPushDown()); public LogicalPlanner(MPPQueryContext context) { this.context = context; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/OperatorTreeGenerator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/OperatorTreeGenerator.java index a6d043f5ad2..9517179c98a 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/OperatorTreeGenerator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/OperatorTreeGenerator.java @@ -192,6 +192,7 @@ import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.MergeSortN import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.MultiChildProcessNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.OffsetNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.ProjectNode; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.RawDataAggregationNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.SingleDeviceViewNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.SlidingWindowAggregationNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.SortNode; @@ -1753,11 +1754,12 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP } @Override - public Operator visitAggregation(AggregationNode node, LocalExecutionPlanContext context) { + public Operator visitRawDataAggregation( + RawDataAggregationNode node, LocalExecutionPlanContext context) { checkArgument( !node.getAggregationDescriptorList().isEmpty(), "Aggregation descriptorList cannot be empty"); - List<Operator> children = dealWithConsumeAllChildrenPipelineBreaker(node, context); + Operator child = node.getChild().accept(this, context); boolean ascending = node.getScanOrder() == Ordering.ASC; List<Aggregator> aggregators = new ArrayList<>(); Map<String, List<InputLocation>> layout = makeLayout(node); @@ -1779,128 +1781,154 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP descriptor.getStep(), inputLocationList)); } - boolean inputRaw = node.getAggregationDescriptorList().get(0).getStep().isInputRaw(); + GroupByTimeParameter groupByTimeParameter = node.getGroupByTimeParameter(); GroupByParameter groupByParameter = node.getGroupByParameter(); - if (inputRaw) { - checkArgument(children.size() == 1, "rawDataAggregateOperator can only accept one input"); - OperatorContext operatorContext = - context - .getDriverContext() - .addOperatorContext( - context.getNextOperatorId(), - node.getPlanNodeId(), - RawDataAggregationOperator.class.getSimpleName()); - - ITimeRangeIterator timeRangeIterator = - initTimeRangeIterator(groupByTimeParameter, ascending, true); - long maxReturnSize = - calculateMaxAggregationResultSize( - aggregationDescriptors, timeRangeIterator, context.getTypeProvider()); - - // groupByParameter and groupByTimeParameter - if (groupByParameter != null) { - WindowType windowType = groupByParameter.getWindowType(); - - WindowParameter windowParameter; - switch (windowType) { - case VARIATION_WINDOW: - Expression groupByVariationExpression = node.getGroupByExpression(); - if (groupByVariationExpression == null) { - throw new IllegalArgumentException("groupByVariationExpression can't be null"); - } - String controlColumn = groupByVariationExpression.getExpressionString(); - TSDataType controlColumnType = context.getTypeProvider().getType(controlColumn); - windowParameter = - new VariationWindowParameter( - controlColumnType, - layout.get(controlColumn).get(0).getValueColumnIndex(), - node.isOutputEndTime(), - ((GroupByVariationParameter) groupByParameter).isIgnoringNull(), - ((GroupByVariationParameter) groupByParameter).getDelta()); - break; - case CONDITION_WINDOW: - Expression groupByConditionExpression = node.getGroupByExpression(); - if (groupByConditionExpression == null) { - throw new IllegalArgumentException("groupByConditionExpression can't be null"); - } - windowParameter = - new ConditionWindowParameter( - node.isOutputEndTime(), - ((GroupByConditionParameter) groupByParameter).isIgnoringNull(), - layout - .get(groupByConditionExpression.getExpressionString()) - .get(0) - .getValueColumnIndex(), - ((GroupByConditionParameter) groupByParameter).getKeepExpression()); - break; - case SESSION_WINDOW: - windowParameter = - new SessionWindowParameter( - ((GroupBySessionParameter) groupByParameter).getTimeInterval(), - node.isOutputEndTime()); - break; - case COUNT_WINDOW: - Expression groupByCountExpression = node.getGroupByExpression(); - if (groupByCountExpression == null) { - throw new IllegalArgumentException("groupByCountExpression can't be null"); - } - windowParameter = - new CountWindowParameter( - ((GroupByCountParameter) groupByParameter).getCountNumber(), - layout - .get(groupByCountExpression.getExpressionString()) - .get(0) - .getValueColumnIndex(), - node.isOutputEndTime(), - ((GroupByCountParameter) groupByParameter).isIgnoreNull()); - break; - default: - throw new IllegalArgumentException("Unsupported window type"); - } - return new RawDataAggregationOperator( - operatorContext, - aggregators, - timeRangeIterator, - children.get(0), - ascending, - maxReturnSize, - windowParameter); - } + OperatorContext operatorContext = + context + .getDriverContext() + .addOperatorContext( + context.getNextOperatorId(), + node.getPlanNodeId(), + RawDataAggregationOperator.class.getSimpleName()); + + ITimeRangeIterator timeRangeIterator = + initTimeRangeIterator(groupByTimeParameter, ascending, true); + long maxReturnSize = + calculateMaxAggregationResultSize( + aggregationDescriptors, timeRangeIterator, context.getTypeProvider()); - WindowParameter windowParameter = new TimeWindowParameter(node.isOutputEndTime()); + // groupByParameter and groupByTimeParameter + if (groupByParameter != null) { + WindowType windowType = groupByParameter.getWindowType(); + + WindowParameter windowParameter; + switch (windowType) { + case VARIATION_WINDOW: + Expression groupByVariationExpression = node.getGroupByExpression(); + if (groupByVariationExpression == null) { + throw new IllegalArgumentException("groupByVariationExpression can't be null"); + } + String controlColumn = groupByVariationExpression.getExpressionString(); + TSDataType controlColumnType = context.getTypeProvider().getType(controlColumn); + windowParameter = + new VariationWindowParameter( + controlColumnType, + layout.get(controlColumn).get(0).getValueColumnIndex(), + node.isOutputEndTime(), + ((GroupByVariationParameter) groupByParameter).isIgnoringNull(), + ((GroupByVariationParameter) groupByParameter).getDelta()); + break; + case CONDITION_WINDOW: + Expression groupByConditionExpression = node.getGroupByExpression(); + if (groupByConditionExpression == null) { + throw new IllegalArgumentException("groupByConditionExpression can't be null"); + } + windowParameter = + new ConditionWindowParameter( + node.isOutputEndTime(), + ((GroupByConditionParameter) groupByParameter).isIgnoringNull(), + layout + .get(groupByConditionExpression.getExpressionString()) + .get(0) + .getValueColumnIndex(), + ((GroupByConditionParameter) groupByParameter).getKeepExpression()); + break; + case SESSION_WINDOW: + windowParameter = + new SessionWindowParameter( + ((GroupBySessionParameter) groupByParameter).getTimeInterval(), + node.isOutputEndTime()); + break; + case COUNT_WINDOW: + Expression groupByCountExpression = node.getGroupByExpression(); + if (groupByCountExpression == null) { + throw new IllegalArgumentException("groupByCountExpression can't be null"); + } + windowParameter = + new CountWindowParameter( + ((GroupByCountParameter) groupByParameter).getCountNumber(), + layout + .get(groupByCountExpression.getExpressionString()) + .get(0) + .getValueColumnIndex(), + node.isOutputEndTime(), + ((GroupByCountParameter) groupByParameter).isIgnoreNull()); + break; + default: + throw new IllegalArgumentException("Unsupported window type"); + } return new RawDataAggregationOperator( operatorContext, aggregators, timeRangeIterator, - children.get(0), + child, ascending, maxReturnSize, windowParameter); - } else { - OperatorContext operatorContext = - context - .getDriverContext() - .addOperatorContext( - context.getNextOperatorId(), - node.getPlanNodeId(), - AggregationOperator.class.getSimpleName()); + } - ITimeRangeIterator timeRangeIterator = - initTimeRangeIterator(groupByTimeParameter, ascending, true); - long maxReturnSize = - calculateMaxAggregationResultSize( - aggregationDescriptors, timeRangeIterator, context.getTypeProvider()); + WindowParameter windowParameter = new TimeWindowParameter(node.isOutputEndTime()); + return new RawDataAggregationOperator( + operatorContext, + aggregators, + timeRangeIterator, + child, + ascending, + maxReturnSize, + windowParameter); + } - return new AggregationOperator( - operatorContext, - aggregators, - timeRangeIterator, - children, - node.isOutputEndTime(), - maxReturnSize); + @Override + public Operator visitAggregation(AggregationNode node, LocalExecutionPlanContext context) { + checkArgument( + !node.getAggregationDescriptorList().isEmpty(), + "Aggregation descriptorList cannot be empty"); + List<Operator> children = dealWithConsumeAllChildrenPipelineBreaker(node, context); + boolean ascending = node.getScanOrder() == Ordering.ASC; + List<Aggregator> aggregators = new ArrayList<>(); + Map<String, List<InputLocation>> layout = makeLayout(node); + List<AggregationDescriptor> aggregationDescriptors = node.getAggregationDescriptorList(); + for (AggregationDescriptor descriptor : node.getAggregationDescriptorList()) { + List<InputLocation[]> inputLocationList = calcInputLocationList(descriptor, layout); + aggregators.add( + new Aggregator( + AccumulatorFactory.createAccumulator( + descriptor.getAggregationFuncName(), + descriptor.getAggregationType(), + descriptor.getInputExpressions().stream() + .map(x -> context.getTypeProvider().getType(x.getExpressionString())) + .collect(Collectors.toList()), + descriptor.getInputExpressions(), + descriptor.getInputAttributes(), + ascending, + descriptor.getStep().isInputRaw()), + descriptor.getStep(), + inputLocationList)); } + + OperatorContext operatorContext = + context + .getDriverContext() + .addOperatorContext( + context.getNextOperatorId(), + node.getPlanNodeId(), + AggregationOperator.class.getSimpleName()); + + ITimeRangeIterator timeRangeIterator = + initTimeRangeIterator(node.getGroupByTimeParameter(), ascending, true); + long maxReturnSize = + calculateMaxAggregationResultSize( + aggregationDescriptors, timeRangeIterator, context.getTypeProvider()); + + return new AggregationOperator( + operatorContext, + aggregators, + timeRangeIterator, + children, + node.isOutputEndTime(), + maxReturnSize); } private List<InputLocation[]> calcInputLocationList( diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/ExchangeNodeAdder.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/ExchangeNodeAdder.java index 3ae7a3f126f..958e85592ea 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/ExchangeNodeAdder.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/ExchangeNodeAdder.java @@ -46,6 +46,7 @@ import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.LimitNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.MergeSortNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.MultiChildProcessNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.ProjectNode; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.RawDataAggregationNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.SingleDeviceViewNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.SlidingWindowAggregationNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.SortNode; @@ -381,6 +382,11 @@ public class ExchangeNodeAdder extends PlanVisitor<PlanNode, NodeGroupContext> { return processOneChildNode(node, context); } + @Override + public PlanNode visitRawDataAggregation(RawDataAggregationNode node, NodeGroupContext context) { + return processOneChildNode(node, context); + } + @Override public PlanNode visitExplainAnalyze(ExplainAnalyzeNode node, NodeGroupContext context) { ExplainAnalyzeNode newNode = (ExplainAnalyzeNode) node.clone(); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanNodeType.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanNodeType.java index bda66843a97..c96d87573b9 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanNodeType.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanNodeType.java @@ -78,6 +78,7 @@ import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.LimitNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.MergeSortNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.OffsetNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.ProjectNode; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.RawDataAggregationNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.SingleDeviceViewNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.SlidingWindowAggregationNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.SortNode; @@ -209,6 +210,8 @@ public enum PlanNodeType { EXPLAIN_ANALYZE((short) 90), PIPE_OPERATE_SCHEMA_QUEUE_REFERENCE((short) 91), + + RAW_DATA_AGGREGATION((short) 92), ; public static final int BYTES = Short.BYTES; @@ -440,6 +443,8 @@ public enum PlanNodeType { return ExplainAnalyzeNode.deserialize(buffer); case 91: return PipeOperateSchemaQueueNode.deserialize(buffer); + case 92: + return RawDataAggregationNode.deserialize(buffer); default: throw new IllegalArgumentException("Invalid node type: " + nodeType); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanVisitor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanVisitor.java index 608ca14df24..932a9d22597 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanVisitor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanVisitor.java @@ -76,6 +76,7 @@ import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.MergeSortN import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.MultiChildProcessNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.OffsetNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.ProjectNode; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.RawDataAggregationNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.SingleChildProcessNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.SingleDeviceViewNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.SlidingWindowAggregationNode; @@ -222,6 +223,10 @@ public abstract class PlanVisitor<R, C> { return visitSingleChildProcess(node, context); } + public R visitRawDataAggregation(RawDataAggregationNode node, C context) { + return visitSingleChildProcess(node, context); + } + // two child ----------------------------------------------------------------------------------- public R visitTwoChildProcess(TwoChildProcessNode node, C context) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/process/RawDataAggregationNode.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/process/RawDataAggregationNode.java new file mode 100644 index 00000000000..ab2683743a3 --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/process/RawDataAggregationNode.java @@ -0,0 +1,322 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.queryengine.plan.planner.plan.node.process; + +import org.apache.iotdb.db.queryengine.common.header.ColumnHeaderConstant; +import org.apache.iotdb.db.queryengine.plan.expression.Expression; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeType; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanVisitor; +import org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.AggregationDescriptor; +import org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.GroupByParameter; +import org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.GroupByTimeParameter; +import org.apache.iotdb.db.queryengine.plan.statement.component.Ordering; +import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils; + +import javax.annotation.Nullable; + +import java.io.DataOutputStream; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.List; +import java.util.Objects; +import java.util.stream.Collectors; + +import static org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.AggregationNode.getDeduplicatedDescriptors; + +public class RawDataAggregationNode extends SingleChildProcessNode { + + // The list of aggregate functions, each AggregateDescriptor will be output as one or two column + // of result TsBlock + protected List<AggregationDescriptor> aggregationDescriptorList; + + // The parameter of `group by time`. + // Its value will be null if there is no `group by time` clause. + @Nullable protected GroupByTimeParameter groupByTimeParameter; + + // The parameter of `group by`. + // Its value will be null if there is no `group by` clause. + @Nullable protected GroupByParameter groupByParameter; + + // In some situation of `group by` clause, groupByExpression is required. + // It will be null if the clause doesn't refer to any expression. + protected Expression groupByExpression; + + protected Ordering scanOrder; + + protected boolean outputEndTime = false; + + public RawDataAggregationNode( + PlanNodeId id, + List<AggregationDescriptor> aggregationDescriptorList, + @Nullable GroupByTimeParameter groupByTimeParameter, + Ordering scanOrder) { + super(id); + this.aggregationDescriptorList = getDeduplicatedDescriptors(aggregationDescriptorList); + this.groupByTimeParameter = groupByTimeParameter; + this.scanOrder = scanOrder; + } + + public RawDataAggregationNode( + PlanNodeId id, + PlanNode child, + List<AggregationDescriptor> aggregationDescriptorList, + @Nullable GroupByTimeParameter groupByTimeParameter, + Ordering scanOrder) { + super(id, child); + this.aggregationDescriptorList = getDeduplicatedDescriptors(aggregationDescriptorList); + this.groupByTimeParameter = groupByTimeParameter; + this.scanOrder = scanOrder; + } + + public RawDataAggregationNode( + PlanNodeId id, + List<AggregationDescriptor> aggregationDescriptorList, + @Nullable GroupByTimeParameter groupByTimeParameter, + @Nullable GroupByParameter groupByParameter, + Expression groupByExpression, + boolean outputEndTime, + Ordering scanOrder) { + super(id); + this.aggregationDescriptorList = getDeduplicatedDescriptors(aggregationDescriptorList); + this.groupByTimeParameter = groupByTimeParameter; + this.scanOrder = scanOrder; + this.groupByParameter = groupByParameter; + this.groupByExpression = groupByExpression; + this.outputEndTime = outputEndTime; + } + + public RawDataAggregationNode( + PlanNodeId id, + PlanNode child, + List<AggregationDescriptor> aggregationDescriptorList, + @Nullable GroupByTimeParameter groupByTimeParameter, + @Nullable GroupByParameter groupByParameter, + Expression groupByExpression, + boolean outputEndTime, + Ordering scanOrder) { + super(id, child); + this.aggregationDescriptorList = getDeduplicatedDescriptors(aggregationDescriptorList); + this.scanOrder = scanOrder; + this.groupByParameter = groupByParameter; + this.groupByTimeParameter = groupByTimeParameter; + this.groupByExpression = groupByExpression; + this.outputEndTime = outputEndTime; + } + + public List<AggregationDescriptor> getAggregationDescriptorList() { + return aggregationDescriptorList; + } + + @Nullable + public GroupByTimeParameter getGroupByTimeParameter() { + return groupByTimeParameter; + } + + @Nullable + public GroupByParameter getGroupByParameter() { + return groupByParameter; + } + + public Ordering getScanOrder() { + return scanOrder; + } + + public boolean isOutputEndTime() { + return outputEndTime; + } + + public void setOutputEndTime(boolean outputEndTime) { + this.outputEndTime = outputEndTime; + } + + @Nullable + public Expression getGroupByExpression() { + return groupByExpression; + } + + @Override + public PlanNodeType getType() { + return PlanNodeType.RAW_DATA_AGGREGATION; + } + + @Override + public PlanNode clone() { + return new RawDataAggregationNode( + getPlanNodeId(), + getAggregationDescriptorList(), + getGroupByTimeParameter(), + getGroupByParameter(), + getGroupByExpression(), + outputEndTime, + getScanOrder()); + } + + @Override + public List<String> getOutputColumnNames() { + List<String> outputColumnNames = new ArrayList<>(); + if (outputEndTime) { + outputColumnNames.add(ColumnHeaderConstant.ENDTIME); + } + outputColumnNames.addAll( + aggregationDescriptorList.stream() + .map(AggregationDescriptor::getOutputColumnNames) + .flatMap(List::stream) + .collect(Collectors.toList())); + + return outputColumnNames; + } + + @Override + public <R, C> R accept(PlanVisitor<R, C> visitor, C context) { + return visitor.visitRawDataAggregation(this, context); + } + + @Override + protected void serializeAttributes(ByteBuffer byteBuffer) { + PlanNodeType.RAW_DATA_AGGREGATION.serialize(byteBuffer); + ReadWriteIOUtils.write(aggregationDescriptorList.size(), byteBuffer); + for (AggregationDescriptor aggregationDescriptor : aggregationDescriptorList) { + aggregationDescriptor.serialize(byteBuffer); + } + if (groupByTimeParameter == null) { + ReadWriteIOUtils.write((byte) 0, byteBuffer); + } else { + ReadWriteIOUtils.write((byte) 1, byteBuffer); + groupByTimeParameter.serialize(byteBuffer); + } + if (groupByParameter == null) { + ReadWriteIOUtils.write((byte) 0, byteBuffer); + } else { + ReadWriteIOUtils.write((byte) 1, byteBuffer); + groupByParameter.serialize(byteBuffer); + } + if (groupByExpression == null) { + ReadWriteIOUtils.write((byte) 0, byteBuffer); + } else { + ReadWriteIOUtils.write((byte) 1, byteBuffer); + Expression.serialize(groupByExpression, byteBuffer); + } + ReadWriteIOUtils.write(outputEndTime, byteBuffer); + ReadWriteIOUtils.write(scanOrder.ordinal(), byteBuffer); + } + + @Override + protected void serializeAttributes(DataOutputStream stream) throws IOException { + PlanNodeType.RAW_DATA_AGGREGATION.serialize(stream); + ReadWriteIOUtils.write(aggregationDescriptorList.size(), stream); + for (AggregationDescriptor aggregationDescriptor : aggregationDescriptorList) { + aggregationDescriptor.serialize(stream); + } + if (groupByTimeParameter == null) { + ReadWriteIOUtils.write((byte) 0, stream); + } else { + ReadWriteIOUtils.write((byte) 1, stream); + groupByTimeParameter.serialize(stream); + } + if (groupByParameter == null) { + ReadWriteIOUtils.write((byte) 0, stream); + } else { + ReadWriteIOUtils.write((byte) 1, stream); + groupByParameter.serialize(stream); + } + if (groupByExpression == null) { + ReadWriteIOUtils.write((byte) 0, stream); + } else { + ReadWriteIOUtils.write((byte) 1, stream); + Expression.serialize(groupByExpression, stream); + } + ReadWriteIOUtils.write(outputEndTime, stream); + ReadWriteIOUtils.write(scanOrder.ordinal(), stream); + } + + public static RawDataAggregationNode deserialize(ByteBuffer byteBuffer) { + int descriptorSize = ReadWriteIOUtils.readInt(byteBuffer); + List<AggregationDescriptor> aggregationDescriptorList = new ArrayList<>(); + while (descriptorSize > 0) { + aggregationDescriptorList.add(AggregationDescriptor.deserialize(byteBuffer)); + descriptorSize--; + } + byte isNull = ReadWriteIOUtils.readByte(byteBuffer); + GroupByTimeParameter groupByTimeParameter = null; + if (isNull == 1) { + groupByTimeParameter = GroupByTimeParameter.deserialize(byteBuffer); + } + isNull = ReadWriteIOUtils.readByte(byteBuffer); + GroupByParameter groupByParameter = null; + if (isNull == 1) { + groupByParameter = GroupByParameter.deserialize(byteBuffer); + } + isNull = ReadWriteIOUtils.readByte(byteBuffer); + Expression groupByExpression = null; + if (isNull == 1) { + groupByExpression = Expression.deserialize(byteBuffer); + } + boolean outputEndTime = ReadWriteIOUtils.readBool(byteBuffer); + Ordering scanOrder = Ordering.values()[ReadWriteIOUtils.readInt(byteBuffer)]; + PlanNodeId planNodeId = PlanNodeId.deserialize(byteBuffer); + return new RawDataAggregationNode( + planNodeId, + aggregationDescriptorList, + groupByTimeParameter, + groupByParameter, + groupByExpression, + outputEndTime, + scanOrder); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + if (!super.equals(o)) { + return false; + } + RawDataAggregationNode that = (RawDataAggregationNode) o; + return Objects.equals(aggregationDescriptorList, that.aggregationDescriptorList) + && Objects.equals(groupByTimeParameter, that.groupByTimeParameter) + && Objects.equals(groupByParameter, that.groupByParameter) + && Objects.equals(groupByExpression, that.groupByExpression) + && Objects.equals(outputEndTime, that.outputEndTime) + && scanOrder == that.scanOrder; + } + + @Override + public int hashCode() { + return Objects.hash( + super.hashCode(), + aggregationDescriptorList, + groupByTimeParameter, + groupByParameter, + groupByExpression, + outputEndTime, + scanOrder); + } + + public String toString() { + return String.format("RawDataAggregationNode-%s", getPlanNodeId()); + } +}
