This is an automated email from the ASF dual-hosted git repository. shuwenwei pushed a commit to branch moveSomeClassToCommonModule in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit ebd598cd36510ffadc307ef407edd84be429e7b7 Author: shuwenwei <[email protected]> AuthorDate: Thu Jun 11 15:05:01 2026 +0800 fix --- .../calc/plan/planner/TableOperatorGenerator.java | 98 ++++++++++++++++++++-- 1 file changed, 90 insertions(+), 8 deletions(-) diff --git a/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/plan/planner/TableOperatorGenerator.java b/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/plan/planner/TableOperatorGenerator.java index a59016030e7..c5bf2e25094 100644 --- a/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/plan/planner/TableOperatorGenerator.java +++ b/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/plan/planner/TableOperatorGenerator.java @@ -422,7 +422,7 @@ public abstract class TableOperatorGenerator< context); } - private List<TSDataType> getInputColumnTypes(PlanNode node, ITableTypeProvider typeProvider) { + protected List<TSDataType> getInputColumnTypes(PlanNode node, ITableTypeProvider typeProvider) { // ignore "time" column return node.getChildren().stream() .map(PlanNode::getOutputSymbols) @@ -1304,7 +1304,7 @@ public abstract class TableOperatorGenerator< return planGroupByAggregation(node, child, context.getTableTypeProvider(), context); } - private Operator planGlobalAggregation( + protected AggregationOperator planGlobalAggregation( AggregationNode node, Operator child, ITableTypeProvider typeProvider, C context) { CommonOperatorContext operatorContext = addOperatorContext( @@ -1328,7 +1328,7 @@ public abstract class TableOperatorGenerator< false, null, Collections.emptySet()))); - return new AggregationOperator(operatorContext, child, aggregatorBuilder.build()); + return createAggregationOperator(operatorContext, child, aggregatorBuilder.build()); } // timeColumnName and measurementColumnNames will only be set for AggTableScan. @@ -1411,7 +1411,7 @@ public abstract class TableOperatorGenerator< CommonOperatorContext operatorContext = addOperatorContext( context, node.getPlanNodeId(), StreamingAggregationOperator.class.getSimpleName()); - return new StreamingAggregationOperator( + return createStreamingAggregationOperator( operatorContext, child, groupByTypes, @@ -1457,7 +1457,7 @@ public abstract class TableOperatorGenerator< context, node.getPlanNodeId(), StreamingHashAggregationOperator.class.getSimpleName()); - return new StreamingHashAggregationOperator( + return createStreamingHashAggregationOperator( operatorContext, child, preGroupedChannels, @@ -1484,7 +1484,7 @@ public abstract class TableOperatorGenerator< addOperatorContext( context, node.getPlanNodeId(), HashAggregationOperator.class.getSimpleName()); - return new HashAggregationOperator( + return createHashAggregationOperator( operatorContext, child, groupByTypes, @@ -1497,6 +1497,89 @@ public abstract class TableOperatorGenerator< Long.MAX_VALUE); } + protected AggregationOperator createAggregationOperator( + CommonOperatorContext operatorContext, Operator child, List<TableAggregator> aggregators) { + return new AggregationOperator(operatorContext, child, aggregators); + } + + protected StreamingAggregationOperator createStreamingAggregationOperator( + CommonOperatorContext operatorContext, + Operator child, + List<Type> groupByTypes, + List<Integer> groupByChannels, + Comparator<SortKey> groupKeyComparator, + List<TableAggregator> aggregators, + long maxPartialMemory, + boolean spillEnabled, + long unSpillMemoryLimit) { + return new StreamingAggregationOperator( + operatorContext, + child, + groupByTypes, + groupByChannels, + groupKeyComparator, + aggregators, + maxPartialMemory, + spillEnabled, + unSpillMemoryLimit); + } + + protected StreamingHashAggregationOperator createStreamingHashAggregationOperator( + CommonOperatorContext operatorContext, + Operator child, + List<Integer> preGroupedChannels, + List<Integer> preGroupedIndexInResult, + List<Type> unPreGroupedTypes, + List<Integer> unPreGroupedChannels, + List<Integer> unPreGroupedIndexInResult, + Comparator<SortKey> groupKeyComparator, + List<GroupedAggregator> aggregators, + AggregationNode.Step step, + int expectedGroups, + long maxPartialMemory, + boolean spillEnabled, + long unSpillMemoryLimit) { + return new StreamingHashAggregationOperator( + operatorContext, + child, + preGroupedChannels, + preGroupedIndexInResult, + unPreGroupedTypes, + unPreGroupedChannels, + unPreGroupedIndexInResult, + groupKeyComparator, + aggregators, + step, + expectedGroups, + maxPartialMemory, + spillEnabled, + unSpillMemoryLimit); + } + + protected HashAggregationOperator createHashAggregationOperator( + CommonOperatorContext operatorContext, + Operator child, + List<Type> groupByTypes, + List<Integer> groupByChannels, + List<GroupedAggregator> aggregators, + AggregationNode.Step step, + int expectedGroups, + long maxPartialMemory, + boolean spillEnabled, + long unSpillMemoryLimit) { + return new HashAggregationOperator( + operatorContext, + child, + groupByTypes, + groupByChannels, + aggregators, + step, + expectedGroups, + maxPartialMemory, + spillEnabled, + unSpillMemoryLimit); + } + protected Comparator<SortKey> genGroupKeyComparator( List<Type> groupTypes, List<Integer> groupByChannels) { return getComparatorForTable( @@ -2248,8 +2331,7 @@ public abstract class TableOperatorGenerator< public Operator visitRowNumber(RowNumberNode node, C context) { Operator child = node.getChild().accept(this, context); CommonOperatorContext operatorContext = - addOperatorContext( - context, node.getPlanNodeId(), MappingCollectOperator.class.getSimpleName()); + addOperatorContext(context, node.getPlanNodeId(), RowNumberOperator.class.getSimpleName()); List<Symbol> partitionBySymbols = node.getPartitionBy(); Map<Symbol, Integer> childLayout =
