This is an automated email from the ASF dual-hosted git repository. lancelly pushed a commit to branch max_by in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 11a1b83e75ba6bc8bc2516be3c1a05b635031924 Author: lancelly <[email protected]> AuthorDate: Thu Jan 18 22:43:23 2024 +0800 Draft FE for multiInput aggregation --- .../constant/BuiltinAggregationFunctionEnum.java | 1 + .../java/org/apache/iotdb/tool/ExportTsFile.java | 3 +- .../apache/iotdb/jdbc/IoTDBDatabaseMetadata.java | 1 + .../sql/factory/IoTDBDynamicTableFactory.java | 1 + .../protocol/thrift/impl/ClientRPCServiceImpl.java | 3 +- .../execution/aggregation/AccumulatorFactory.java | 43 +++++++++++++++++++++- .../execution/aggregation/Aggregator.java | 17 +++++---- .../execution/aggregation/MaxByAccumulator.java | 4 ++ .../SlidingWindowAggregatorFactory.java | 10 ++--- .../plan/analyze/ExpressionTypeAnalyzer.java | 37 ++++++++++++++++++- .../cartesian/BindSchemaForExpressionVisitor.java | 7 ++-- .../ConcatExpressionWithSuffixPathsVisitor.java | 23 ++++++------ .../db/queryengine/plan/parser/ASTVisitor.java | 1 + .../plan/planner/OperatorTreeGenerator.java | 41 ++++++++++----------- .../plan/parameter/AggregationDescriptor.java | 15 +++++++- .../CrossSeriesAggregationDescriptor.java | 11 +++++- .../org/apache/iotdb/db/utils/SchemaUtils.java | 4 ++ .../apache/iotdb/db/utils/TypeInferenceUtils.java | 3 ++ .../iotdb/db/utils/constant/SqlConstant.java | 3 ++ .../execution/aggregation/AccumulatorTest.java | 34 ++++++++--------- .../AlignedSeriesAggregationScanOperatorTest.java | 22 +++++------ .../execution/operator/OperatorMemoryTest.java | 8 ++-- .../SlidingWindowAggregationOperatorTest.java | 2 +- .../udf/builtin/BuiltinAggregationFunction.java | 5 ++- .../thrift-commons/src/main/thrift/common.thrift | 3 +- 25 files changed, 209 insertions(+), 93 deletions(-) diff --git a/integration-test/src/main/java/org/apache/iotdb/itbase/constant/BuiltinAggregationFunctionEnum.java b/integration-test/src/main/java/org/apache/iotdb/itbase/constant/BuiltinAggregationFunctionEnum.java index a5d73836715..b1524341812 100644 --- a/integration-test/src/main/java/org/apache/iotdb/itbase/constant/BuiltinAggregationFunctionEnum.java +++ b/integration-test/src/main/java/org/apache/iotdb/itbase/constant/BuiltinAggregationFunctionEnum.java @@ -41,6 +41,7 @@ public enum BuiltinAggregationFunctionEnum { COUNT("count"), AVG("avg"), SUM("sum"), + MAX_BY("max_by") ; private final String functionName; diff --git a/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/ExportTsFile.java b/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/ExportTsFile.java index 9c9329ead7a..4b69013b12d 100644 --- a/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/ExportTsFile.java +++ b/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/ExportTsFile.java @@ -178,7 +178,8 @@ public class ExportTsFile extends AbstractTsFileTool { || sqlLower.contains("stddev_samp(") || sqlLower.contains("variance(") || sqlLower.contains("var_pop(") - || sqlLower.contains("var_samp(")) { + || sqlLower.contains("var_samp(") + || sqlLower.contains("max_by(")) { IoTPrinter.println("The sql you entered is invalid, please don't use aggregate query."); System.exit(CODE_ERROR); } diff --git a/iotdb-client/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBDatabaseMetadata.java b/iotdb-client/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBDatabaseMetadata.java index 1d77ab3d072..2d3f661c670 100644 --- a/iotdb-client/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBDatabaseMetadata.java +++ b/iotdb-client/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBDatabaseMetadata.java @@ -220,6 +220,7 @@ public class IoTDBDatabaseMetadata implements DatabaseMetaData { "LZMA2", "LATEST", "LIKE", + "MAX_BY", "METADATA", "MERGE", "MOVE", diff --git a/iotdb-connector/flink-sql-iotdb-connector/src/main/java/org/apache/iotdb/flink/sql/factory/IoTDBDynamicTableFactory.java b/iotdb-connector/flink-sql-iotdb-connector/src/main/java/org/apache/iotdb/flink/sql/factory/IoTDBDynamicTableFactory.java index c32de37733e..f5305180e95 100644 --- a/iotdb-connector/flink-sql-iotdb-connector/src/main/java/org/apache/iotdb/flink/sql/factory/IoTDBDynamicTableFactory.java +++ b/iotdb-connector/flink-sql-iotdb-connector/src/main/java/org/apache/iotdb/flink/sql/factory/IoTDBDynamicTableFactory.java @@ -202,6 +202,7 @@ public class IoTDBDynamicTableFactory || sqlLower.contains("last_value(") || sqlLower.contains("max_time(") || sqlLower.contains("min_time(") + || sqlLower.contains("max_by(") || sqlLower.contains("stddev(") || sqlLower.contains("stddev_pop(") || sqlLower.contains("stddev_samp(") diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java index 645159b75ae..8c21de33986 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java @@ -669,7 +669,8 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { Aggregator aggregator = new Aggregator( - AccumulatorFactory.createAccumulator(aggregationType, dataType, null, null, true), + AccumulatorFactory.createAccumulator( + aggregationType, Collections.singletonList(dataType), null, null, true), AggregationStep.SINGLE, Collections.singletonList(new InputLocation[] {new InputLocation(0, 0)})); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/AccumulatorFactory.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/AccumulatorFactory.java index 40d0a2d9bcd..9c0b7ce7fcb 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/AccumulatorFactory.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/AccumulatorFactory.java @@ -20,18 +20,34 @@ package org.apache.iotdb.db.queryengine.execution.aggregation; import org.apache.iotdb.common.rpc.thrift.TAggregationType; +import org.apache.iotdb.commons.utils.TestOnly; import org.apache.iotdb.db.queryengine.plan.expression.Expression; import org.apache.iotdb.db.queryengine.plan.expression.binary.CompareBinaryExpression; import org.apache.iotdb.db.queryengine.plan.expression.leaf.ConstantOperand; import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.Map; +import static com.google.common.base.Preconditions.checkState; + public class AccumulatorFactory { public static Accumulator createAccumulator( + TAggregationType aggregationType, + List<TSDataType> inputDataTypes, + List<Expression> inputExpressions, + Map<String, String> inputAttributes, + boolean ascending) { + return isMultiInputAggregation(aggregationType) + ? createAccumulatorWithMultiInput(aggregationType, inputDataTypes) + : createSingleInputAccumulator( + aggregationType, inputDataTypes.get(0), inputExpressions, inputAttributes, ascending); + } + + private static Accumulator createSingleInputAccumulator( TAggregationType aggregationType, TSDataType tsDataType, List<Expression> inputExpressions, @@ -106,6 +122,27 @@ public class AccumulatorFactory { } } + public static Accumulator createAccumulatorWithMultiInput( + TAggregationType aggregationType, List<TSDataType> inputDataTypes) { + switch (aggregationType) { + case MAX_BY: + checkState(inputDataTypes.size() == 2, "Wrong inputDataTypes size."); + return new MaxByAccumulator(inputDataTypes.get(0), inputDataTypes.get(1)); + default: + throw new IllegalArgumentException("Invalid Aggregation function: " + aggregationType); + } + } + + private static boolean isMultiInputAggregation(TAggregationType aggregationType) { + switch (aggregationType) { + case MAX_BY: + return true; + default: + return false; + } + } + + @TestOnly public static List<Accumulator> createAccumulators( List<TAggregationType> aggregationTypes, TSDataType tsDataType, @@ -116,7 +153,11 @@ public class AccumulatorFactory { for (TAggregationType aggregationType : aggregationTypes) { accumulators.add( createAccumulator( - aggregationType, tsDataType, inputExpressions, inputAttributes, ascending)); + aggregationType, + Collections.singletonList(tsDataType), + inputExpressions, + inputAttributes, + ascending)); } return accumulators; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/Aggregator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/Aggregator.java index f6f46c5dd0b..e99311b8d21 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/Aggregator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/Aggregator.java @@ -68,17 +68,20 @@ public class Aggregator { checkArgument( step.isInputRaw(), "Step in SeriesAggregateScanOperator and RawDataAggregateOperator can only process raw input"); - for (InputLocation[] inputLocations : inputLocationList) { + checkArgument( + inputLocationList.size() == 1, + "inputLocationList.size() in SeriesAggregateScanOperator and RawDataAggregateOperator can only be 1."); + Column[] timeAndValueColumn = new Column[1 + inputLocationList.get(0).length]; + timeAndValueColumn[0] = tsBlock.getTimeColumn(); + for (int i = 0; i < inputLocationList.get(0).length; i++) { checkArgument( - inputLocations[0].getTsBlockIndex() == 0, + inputLocationList.get(0)[i].getTsBlockIndex() == 0, "RawDataAggregateOperator can only process one tsBlock input."); - Column[] timeAndValueColumn = new Column[2]; - timeAndValueColumn[0] = tsBlock.getTimeColumn(); - int index = inputLocations[0].getValueColumnIndex(); + int index = inputLocationList.get(0)[i].getValueColumnIndex(); // for count_time, time column is also its value column - timeAndValueColumn[1] = index == -1 ? timeAndValueColumn[0] : tsBlock.getColumn(index); - accumulator.addInput(timeAndValueColumn, bitMap, lastIndex); + timeAndValueColumn[1 + i] = index == -1 ? timeAndValueColumn[0] : tsBlock.getColumn(index); } + accumulator.addInput(timeAndValueColumn, bitMap, lastIndex); } finally { QUERY_EXECUTION_METRICS.recordExecutionCost( AGGREGATION_FROM_RAW_DATA, System.nanoTime() - startTime); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/MaxByAccumulator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/MaxByAccumulator.java index d393e8507cb..72a00c98b78 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/MaxByAccumulator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/MaxByAccumulator.java @@ -223,6 +223,7 @@ public class MaxByAccumulator implements Accumulator { private void updateIntResult(int yMaxVal, Column xColumn, int xIndex) { if (!initResult || yMaxVal > yMaxValue.getInt()) { initResult = true; + yMaxValue.setInt(yMaxVal); updateX(xColumn, xIndex); } } @@ -241,6 +242,7 @@ public class MaxByAccumulator implements Accumulator { private void updateLongResult(long yMaxVal, Column xColumn, int xIndex) { if (!initResult || yMaxVal > yMaxValue.getLong()) { initResult = true; + yMaxValue.setLong(yMaxVal); updateX(xColumn, xIndex); } } @@ -259,6 +261,7 @@ public class MaxByAccumulator implements Accumulator { private void updateFloatResult(float yMaxVal, Column xColumn, int xIndex) { if (!initResult || yMaxVal > yMaxValue.getFloat()) { initResult = true; + yMaxValue.setFloat(yMaxVal); updateX(xColumn, xIndex); } } @@ -277,6 +280,7 @@ public class MaxByAccumulator implements Accumulator { private void updateDoubleResult(double yMaxVal, Column xColumn, int xIndex) { if (!initResult || yMaxVal > yMaxValue.getDouble()) { initResult = true; + yMaxValue.setDouble(yMaxVal); updateX(xColumn, xIndex); } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/slidingwindow/SlidingWindowAggregatorFactory.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/slidingwindow/SlidingWindowAggregatorFactory.java index ac01208d36f..53e477be245 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/slidingwindow/SlidingWindowAggregatorFactory.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/slidingwindow/SlidingWindowAggregatorFactory.java @@ -119,7 +119,7 @@ public class SlidingWindowAggregatorFactory { public static SlidingWindowAggregator createSlidingWindowAggregator( TAggregationType aggregationType, - TSDataType dataType, + List<TSDataType> dataTypes, List<Expression> inputExpressions, Map<String, String> inputAttributes, boolean ascending, @@ -127,7 +127,7 @@ public class SlidingWindowAggregatorFactory { AggregationStep step) { Accumulator accumulator = AccumulatorFactory.createAccumulator( - aggregationType, dataType, inputExpressions, inputAttributes, ascending); + aggregationType, dataTypes, inputExpressions, inputAttributes, ascending); switch (aggregationType) { case SUM: case AVG: @@ -142,13 +142,13 @@ public class SlidingWindowAggregatorFactory { return new SmoothQueueSlidingWindowAggregator(accumulator, inputLocationList, step); case MAX_VALUE: return new MonotonicQueueSlidingWindowAggregator( - accumulator, inputLocationList, step, maxComparators.get(dataType)); + accumulator, inputLocationList, step, maxComparators.get(dataTypes.get(0))); case MIN_VALUE: return new MonotonicQueueSlidingWindowAggregator( - accumulator, inputLocationList, step, minComparators.get(dataType)); + accumulator, inputLocationList, step, minComparators.get(dataTypes.get(0))); case EXTREME: return new MonotonicQueueSlidingWindowAggregator( - accumulator, inputLocationList, step, extremeComparators.get(dataType)); + accumulator, inputLocationList, step, extremeComparators.get(dataTypes.get(0))); case MIN_TIME: case FIRST_VALUE: return !ascending diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/ExpressionTypeAnalyzer.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/ExpressionTypeAnalyzer.java index 68a96b29bf8..8f4c1d6fe63 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/ExpressionTypeAnalyzer.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/ExpressionTypeAnalyzer.java @@ -42,6 +42,7 @@ import org.apache.iotdb.db.queryengine.plan.expression.unary.RegularExpression; import org.apache.iotdb.db.queryengine.plan.expression.visitor.ExpressionVisitor; import org.apache.iotdb.db.queryengine.transformation.dag.udf.UDTFInformationInferrer; import org.apache.iotdb.db.utils.TypeInferenceUtils; +import org.apache.iotdb.db.utils.constant.SqlConstant; import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema; @@ -284,7 +285,8 @@ public class ExpressionTypeAnalyzer { functionExpression, TypeInferenceUtils.getAggrDataType( functionExpression.getFunctionName(), - expressionTypes.get(NodeRef.of(inputExpressions.get(0))))); + getInputExpressionTypeForAggregation( + inputExpressions, functionExpression.getFunctionName()))); } if (functionExpression.isBuiltInScalarFunction()) { return setExpressionType( @@ -398,4 +400,37 @@ public class ExpressionTypeAnalyzer { expressionString, actual.name(), Arrays.toString(expected))); } } + + private TSDataType getInputExpressionTypeForAggregation( + List<Expression> inputExpressions, String aggregateFunctionName) { + // Some aggregate functions have a fixed output type, while others determine their output type + // based on the data type of their input. + // Currently, for all aggregate functions without a fixed output type, the output type is + // determined by the first input. + switch (aggregateFunctionName) { + case SqlConstant.MIN_TIME: + case SqlConstant.MAX_TIME: + case SqlConstant.COUNT: + case SqlConstant.TIME_DURATION: + case SqlConstant.COUNT_TIME: + case SqlConstant.AVG: + case SqlConstant.SUM: + case SqlConstant.STDDEV: + case SqlConstant.STDDEV_POP: + case SqlConstant.STDDEV_SAMP: + case SqlConstant.VARIANCE: + case SqlConstant.VAR_POP: + case SqlConstant.VAR_SAMP: + case SqlConstant.LAST_VALUE: + case SqlConstant.FIRST_VALUE: + case SqlConstant.MIN_VALUE: + case SqlConstant.MAX_VALUE: + case SqlConstant.MODE: + case SqlConstant.MAX_BY: + return expressionTypes.get(NodeRef.of(inputExpressions.get(0))); + default: + throw new IllegalArgumentException( + "Invalid Aggregation function: " + aggregateFunctionName); + } + } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/expression/visitor/cartesian/BindSchemaForExpressionVisitor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/expression/visitor/cartesian/BindSchemaForExpressionVisitor.java index eb036e7799b..2d618f6fa9f 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/expression/visitor/cartesian/BindSchemaForExpressionVisitor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/expression/visitor/cartesian/BindSchemaForExpressionVisitor.java @@ -32,6 +32,7 @@ import org.apache.iotdb.db.queryengine.plan.expression.leaf.TimestampOperand; import org.apache.iotdb.db.queryengine.plan.expression.multi.FunctionExpression; import org.apache.iotdb.db.queryengine.plan.expression.visitor.CompleteMeasurementSchemaVisitor; import org.apache.iotdb.db.schemaengine.schemaregion.view.visitor.TransformToExpressionVisitor; +import org.apache.iotdb.db.utils.constant.SqlConstant; import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema; import java.util.ArrayList; @@ -79,11 +80,9 @@ public class BindSchemaForExpressionVisitor extends CartesianProductVisitor<ISch } extendedExpressions.add(actualExpressions); - // We just process first input Expression of AggregationFunction, + // We just process first input Expression of Count_IF, // keep other input Expressions as origin and bind Type - // If AggregationFunction need more than one input series, - // we need to reconsider the process of it - if (functionExpression.isBuiltInAggregationFunctionExpression()) { + if (SqlConstant.COUNT_IF.equalsIgnoreCase(functionExpression.getFunctionName())) { List<Expression> children = functionExpression.getExpressions(); bindTypeForAggregationNonSeriesInputExpressions( functionExpression.getFunctionName(), children, extendedExpressions); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/expression/visitor/cartesian/ConcatExpressionWithSuffixPathsVisitor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/expression/visitor/cartesian/ConcatExpressionWithSuffixPathsVisitor.java index 55ace89bc7d..2e623ba17e6 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/expression/visitor/cartesian/ConcatExpressionWithSuffixPathsVisitor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/expression/visitor/cartesian/ConcatExpressionWithSuffixPathsVisitor.java @@ -43,21 +43,20 @@ public class ConcatExpressionWithSuffixPathsVisitor public List<Expression> visitFunctionExpression( FunctionExpression functionExpression, Context context) { List<List<Expression>> extendedExpressions = new ArrayList<>(); - for (Expression suffixExpression : functionExpression.getExpressions()) { - extendedExpressions.add(process(suffixExpression, context)); - - // We just process first input Expression of AggregationFunction, + if (SqlConstant.COUNT_IF.equalsIgnoreCase(functionExpression.getFunctionName())) { + // We just process first input Expression of Count_IF, // keep other input Expressions as origin - // If AggregationFunction need more than one input series, - // we need to reconsider the process of it - if (functionExpression.isBuiltInAggregationFunctionExpression()) { - List<Expression> children = functionExpression.getExpressions(); - for (int i = 1; i < children.size(); i++) { - extendedExpressions.add(Collections.singletonList(children.get(i))); - } - break; + extendedExpressions.add(process(functionExpression.getExpressions().get(0), context)); + List<Expression> children = functionExpression.getExpressions(); + for (int i = 1; i < children.size(); i++) { + extendedExpressions.add(Collections.singletonList(children.get(i))); } + } else { + functionExpression + .getExpressions() + .forEach(expression -> extendedExpressions.add(process(expression, context))); } + List<List<Expression>> childExpressionsList = new ArrayList<>(); cartesianProduct(extendedExpressions, childExpressionsList, 0, new ArrayList<>()); return reconstructFunctionExpressions(functionExpression, childExpressionsList); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/parser/ASTVisitor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/parser/ASTVisitor.java index 2d31694eab6..57889ffdade 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/parser/ASTVisitor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/parser/ASTVisitor.java @@ -2914,6 +2914,7 @@ public class ASTVisitor extends IoTDBSqlParserBaseVisitor<Statement> { 1); return; case SqlConstant.COUNT_IF: + case SqlConstant.MAX_BY: checkFunctionExpressionInputSize( functionExpression.getExpressionString(), functionExpression.getExpressions().size(), diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/OperatorTreeGenerator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/OperatorTreeGenerator.java index 094a543a916..5c9213376d1 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/OperatorTreeGenerator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/OperatorTreeGenerator.java @@ -389,7 +389,7 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP new Aggregator( AccumulatorFactory.createAccumulator( o.getAggregationType(), - node.getSeriesPath().getSeriesType(), + Collections.singletonList(node.getSeriesPath().getSeriesType()), o.getInputExpressions(), o.getInputAttributes(), ascending), @@ -455,7 +455,7 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP new Aggregator( AccumulatorFactory.createAccumulator( descriptor.getAggregationType(), - seriesDataType, + Collections.singletonList(seriesDataType), descriptor.getInputExpressions(), descriptor.getInputAttributes(), ascending), @@ -467,7 +467,7 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP new Aggregator( AccumulatorFactory.createAccumulator( descriptor.getAggregationType(), - TSDataType.INT64, + Collections.singletonList(TSDataType.INT64), descriptor.getInputExpressions(), descriptor.getInputAttributes(), ascending), @@ -1477,16 +1477,15 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP node.getGroupByLevelDescriptors(); for (CrossSeriesAggregationDescriptor descriptor : aggregationDescriptors) { List<InputLocation[]> inputLocationList = calcInputLocationList(descriptor, layout); - TSDataType seriesDataType = - context - .getTypeProvider() - // get the type of first inputExpression - .getType(descriptor.getInputExpressions().get(0).getExpressionString()); + List<TSDataType> inputDataTypes = + descriptor.getInputExpressions().stream() + .map(x -> context.getTypeProvider().getType(x.getExpressionString())) + .collect(Collectors.toList()); aggregators.add( new Aggregator( AccumulatorFactory.createAccumulator( descriptor.getAggregationType(), - seriesDataType, + inputDataTypes, descriptor.getInputExpressions(), descriptor.getInputAttributes(), ascending), @@ -1536,15 +1535,15 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP continue; } List<InputLocation[]> inputLocations = calcInputLocationList(aggregationDescriptor, layout); - TSDataType seriesDataType = - context - .getTypeProvider() - .getType(aggregationDescriptor.getInputExpressions().get(0).getExpressionString()); + List<TSDataType> inputDataTypes = + aggregationDescriptor.getInputExpressions().stream() + .map(x -> context.getTypeProvider().getType(x.getExpressionString())) + .collect(Collectors.toList()); aggregators.add( new Aggregator( AccumulatorFactory.createAccumulator( aggregationDescriptor.getAggregationType(), - seriesDataType, + inputDataTypes, aggregationDescriptor.getInputExpressions(), aggregationDescriptor.getInputAttributes(), ascending), @@ -1601,10 +1600,9 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP aggregators.add( SlidingWindowAggregatorFactory.createSlidingWindowAggregator( descriptor.getAggregationType(), - context - .getTypeProvider() - // get the type of first inputExpression - .getType(descriptor.getInputExpressions().get(0).getExpressionString()), + descriptor.getInputExpressions().stream() + .map(x -> context.getTypeProvider().getType(x.getExpressionString())) + .collect(Collectors.toList()), descriptor.getInputExpressions(), descriptor.getInputAttributes(), ascending, @@ -1674,10 +1672,9 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP new Aggregator( AccumulatorFactory.createAccumulator( descriptor.getAggregationType(), - context - .getTypeProvider() - // get the type of first inputExpression - .getType(descriptor.getInputExpressions().get(0).getExpressionString()), + descriptor.getInputExpressions().stream() + .map(x -> context.getTypeProvider().getType(x.getExpressionString())) + .collect(Collectors.toList()), descriptor.getInputExpressions(), descriptor.getInputAttributes(), ascending), diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/parameter/AggregationDescriptor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/parameter/AggregationDescriptor.java index 190d3d3767a..a0166b0ba7a 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/parameter/AggregationDescriptor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/parameter/AggregationDescriptor.java @@ -34,6 +34,7 @@ import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.stream.Collectors; public class AggregationDescriptor { @@ -108,8 +109,13 @@ public class AggregationDescriptor { public List<List<String>> getInputColumnNamesList() { if (step.isInputRaw()) { - return Collections.singletonList( - Collections.singletonList(inputExpressions.get(0).getExpressionString())); + List<String> inputColumnNames = + SqlConstant.COUNT_IF.equalsIgnoreCase(aggregationFuncName) + ? Collections.singletonList(inputExpressions.get(0).getExpressionString()) + : inputExpressions.stream() + .map(Expression::getExpressionString) + .collect(Collectors.toList()); + return Collections.singletonList(inputColumnNames); } return Collections.singletonList(getInputColumnNames()); @@ -163,6 +169,11 @@ public class AggregationDescriptor { case VAR_SAMP: outputAggregationNames.add(SqlConstant.VAR_SAMP); break; + case MAX_BY: + // max_by(x, y) takes x and y as the input + outputAggregationNames.add(SqlConstant.MAX_BY_INPUT_X); + outputAggregationNames.add(SqlConstant.MAX_BY_INPUT_Y); + break; default: outputAggregationNames.add(aggregationFuncName); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/parameter/CrossSeriesAggregationDescriptor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/parameter/CrossSeriesAggregationDescriptor.java index 524840fb48a..165df8d10c9 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/parameter/CrossSeriesAggregationDescriptor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/parameter/CrossSeriesAggregationDescriptor.java @@ -20,6 +20,7 @@ package org.apache.iotdb.db.queryengine.plan.planner.plan.parameter; import org.apache.iotdb.db.queryengine.plan.expression.Expression; +import org.apache.iotdb.db.utils.constant.SqlConstant; import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils; import java.io.DataOutputStream; @@ -30,6 +31,7 @@ import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.stream.Collectors; public class CrossSeriesAggregationDescriptor extends AggregationDescriptor { @@ -108,8 +110,13 @@ public class CrossSeriesAggregationDescriptor extends AggregationDescriptor { @Override public List<List<String>> getInputColumnNamesList() { if (step.isInputRaw()) { - return Collections.singletonList( - Collections.singletonList(inputExpressions.get(0).getExpressionString())); + List<String> inputColumnNames = + SqlConstant.COUNT_IF.equalsIgnoreCase(aggregationFuncName) + ? Collections.singletonList(inputExpressions.get(0).getExpressionString()) + : inputExpressions.stream() + .map(Expression::getExpressionString) + .collect(Collectors.toList()); + return Collections.singletonList(inputColumnNames); } List<List<String>> inputColumnNamesList = new ArrayList<>(); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/SchemaUtils.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/SchemaUtils.java index 21a8e1070b3..b7ba465abf7 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/SchemaUtils.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/SchemaUtils.java @@ -136,6 +136,7 @@ public class SchemaUtils { case SqlConstant.MIN_VALUE: case SqlConstant.MAX_VALUE: case SqlConstant.MODE: + case SqlConstant.MAX_BY: default: return null; } @@ -169,6 +170,7 @@ public class SchemaUtils { case VARIANCE: case VAR_POP: case VAR_SAMP: + case MAX_BY: return true; default: throw new IllegalArgumentException( @@ -202,6 +204,8 @@ public class SchemaUtils { return Collections.singletonList(TAggregationType.VAR_POP); case VAR_SAMP: return Collections.singletonList(TAggregationType.VAR_SAMP); + case MAX_BY: + return Collections.singletonList(TAggregationType.MAX_BY); case AVG: return Arrays.asList(TAggregationType.COUNT, TAggregationType.SUM); case TIME_DURATION: diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/TypeInferenceUtils.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/TypeInferenceUtils.java index a214a27b353..93f975d0de4 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/TypeInferenceUtils.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/TypeInferenceUtils.java @@ -142,6 +142,7 @@ public class TypeInferenceUtils { case SqlConstant.MAX_VALUE: case SqlConstant.EXTREME: case SqlConstant.MODE: + case SqlConstant.MAX_BY: return dataType; case SqlConstant.AVG: case SqlConstant.SUM: @@ -187,6 +188,7 @@ public class TypeInferenceUtils { case SqlConstant.LAST_VALUE: case SqlConstant.TIME_DURATION: case SqlConstant.MODE: + case SqlConstant.MAX_BY: return; case SqlConstant.COUNT_IF: if (dataType != TSDataType.BOOLEAN) { @@ -231,6 +233,7 @@ public class TypeInferenceUtils { case SqlConstant.VARIANCE: case SqlConstant.VAR_POP: case SqlConstant.VAR_SAMP: + case SqlConstant.MAX_BY: return; case SqlConstant.COUNT_IF: Expression keepExpression = inputExpressions.get(1); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/constant/SqlConstant.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/constant/SqlConstant.java index 75d8a04aac1..f3e13d1cac1 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/constant/SqlConstant.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/constant/SqlConstant.java @@ -47,6 +47,9 @@ public class SqlConstant { public static final String MAX_TIME = "max_time"; public static final String MAX_VALUE = "max_value"; public static final String MIN_VALUE = "min_value"; + public static final String MAX_BY = "max_by"; + public static final String MAX_BY_INPUT_X = "max_by_input_x"; + public static final String MAX_BY_INPUT_Y = "max_by_input_y"; public static final String EXTREME = "extreme"; public static final String FIRST_VALUE = "first_value"; public static final String LAST_VALUE = "last_value"; diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/aggregation/AccumulatorTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/aggregation/AccumulatorTest.java index 941a39dcb53..d0f92222888 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/aggregation/AccumulatorTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/aggregation/AccumulatorTest.java @@ -81,7 +81,7 @@ public class AccumulatorTest { Accumulator avgAccumulator = AccumulatorFactory.createAccumulator( TAggregationType.AVG, - TSDataType.DOUBLE, + Collections.singletonList(TSDataType.DOUBLE), Collections.emptyList(), Collections.emptyMap(), true); @@ -134,7 +134,7 @@ public class AccumulatorTest { Accumulator countAccumulator = AccumulatorFactory.createAccumulator( TAggregationType.COUNT, - TSDataType.DOUBLE, + Collections.singletonList(TSDataType.DOUBLE), Collections.emptyList(), Collections.emptyMap(), true); @@ -180,7 +180,7 @@ public class AccumulatorTest { Accumulator countTimeAccumulator = AccumulatorFactory.createAccumulator( TAggregationType.COUNT_TIME, - TSDataType.DOUBLE, + Collections.singletonList(TSDataType.DOUBLE), Collections.emptyList(), Collections.emptyMap(), true); @@ -223,7 +223,7 @@ public class AccumulatorTest { Accumulator extremeAccumulator = AccumulatorFactory.createAccumulator( TAggregationType.EXTREME, - TSDataType.DOUBLE, + Collections.singletonList(TSDataType.DOUBLE), Collections.emptyList(), Collections.emptyMap(), true); @@ -263,7 +263,7 @@ public class AccumulatorTest { Accumulator firstValueAccumulator = AccumulatorFactory.createAccumulator( TAggregationType.FIRST_VALUE, - TSDataType.DOUBLE, + Collections.singletonList(TSDataType.DOUBLE), Collections.emptyList(), Collections.emptyMap(), true); @@ -309,7 +309,7 @@ public class AccumulatorTest { Accumulator lastValueAccumulator = AccumulatorFactory.createAccumulator( TAggregationType.LAST_VALUE, - TSDataType.DOUBLE, + Collections.singletonList(TSDataType.DOUBLE), Collections.emptyList(), Collections.emptyMap(), true); @@ -354,7 +354,7 @@ public class AccumulatorTest { Accumulator maxTimeAccumulator = AccumulatorFactory.createAccumulator( TAggregationType.MAX_TIME, - TSDataType.DOUBLE, + Collections.singletonList(TSDataType.DOUBLE), Collections.emptyList(), Collections.emptyMap(), true); @@ -394,7 +394,7 @@ public class AccumulatorTest { Accumulator minTimeAccumulator = AccumulatorFactory.createAccumulator( TAggregationType.MIN_TIME, - TSDataType.DOUBLE, + Collections.singletonList(TSDataType.DOUBLE), Collections.emptyList(), Collections.emptyMap(), true); @@ -434,7 +434,7 @@ public class AccumulatorTest { Accumulator extremeAccumulator = AccumulatorFactory.createAccumulator( TAggregationType.MAX_VALUE, - TSDataType.DOUBLE, + Collections.singletonList(TSDataType.DOUBLE), Collections.emptyList(), Collections.emptyMap(), true); @@ -474,7 +474,7 @@ public class AccumulatorTest { Accumulator extremeAccumulator = AccumulatorFactory.createAccumulator( TAggregationType.MIN_VALUE, - TSDataType.DOUBLE, + Collections.singletonList(TSDataType.DOUBLE), Collections.emptyList(), Collections.emptyMap(), true); @@ -514,7 +514,7 @@ public class AccumulatorTest { Accumulator sumAccumulator = AccumulatorFactory.createAccumulator( TAggregationType.SUM, - TSDataType.DOUBLE, + Collections.singletonList(TSDataType.DOUBLE), Collections.emptyList(), Collections.emptyMap(), true); @@ -560,7 +560,7 @@ public class AccumulatorTest { Accumulator stddevAccumulator = AccumulatorFactory.createAccumulator( TAggregationType.STDDEV, - TSDataType.DOUBLE, + Collections.singletonList(TSDataType.DOUBLE), Collections.emptyList(), Collections.emptyMap(), true); @@ -614,7 +614,7 @@ public class AccumulatorTest { Accumulator stddevPopAccumulator = AccumulatorFactory.createAccumulator( TAggregationType.STDDEV_POP, - TSDataType.DOUBLE, + Collections.singletonList(TSDataType.DOUBLE), Collections.emptyList(), Collections.emptyMap(), true); @@ -661,7 +661,7 @@ public class AccumulatorTest { Accumulator stddevSampAccumulator = AccumulatorFactory.createAccumulator( TAggregationType.STDDEV_SAMP, - TSDataType.DOUBLE, + Collections.singletonList(TSDataType.DOUBLE), Collections.emptyList(), Collections.emptyMap(), true); @@ -715,7 +715,7 @@ public class AccumulatorTest { Accumulator varianceAccumulator = AccumulatorFactory.createAccumulator( TAggregationType.VARIANCE, - TSDataType.DOUBLE, + Collections.singletonList(TSDataType.DOUBLE), Collections.emptyList(), Collections.emptyMap(), true); @@ -769,7 +769,7 @@ public class AccumulatorTest { Accumulator varPopAccumulator = AccumulatorFactory.createAccumulator( TAggregationType.VAR_POP, - TSDataType.DOUBLE, + Collections.singletonList(TSDataType.DOUBLE), Collections.emptyList(), Collections.emptyMap(), true); @@ -816,7 +816,7 @@ public class AccumulatorTest { Accumulator varSampAccumulator = AccumulatorFactory.createAccumulator( TAggregationType.VAR_SAMP, - TSDataType.DOUBLE, + Collections.singletonList(TSDataType.DOUBLE), Collections.emptyList(), Collections.emptyMap(), true); diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/AlignedSeriesAggregationScanOperatorTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/AlignedSeriesAggregationScanOperatorTest.java index 0d48616266c..adb99a7d7d0 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/AlignedSeriesAggregationScanOperatorTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/AlignedSeriesAggregationScanOperatorTest.java @@ -109,7 +109,7 @@ public class AlignedSeriesAggregationScanOperatorTest { new Aggregator( AccumulatorFactory.createAccumulator( TAggregationType.COUNT, - dataType, + Collections.singletonList(dataType), Collections.emptyList(), Collections.emptyMap(), true), @@ -140,7 +140,7 @@ public class AlignedSeriesAggregationScanOperatorTest { new Aggregator( AccumulatorFactory.createAccumulator( TAggregationType.COUNT, - dataType, + Collections.singletonList(dataType), Collections.emptyList(), Collections.emptyMap(), false), @@ -175,7 +175,7 @@ public class AlignedSeriesAggregationScanOperatorTest { new Aggregator( AccumulatorFactory.createAccumulator( aggregationTypes.get(i), - dataType, + Collections.singletonList(dataType), Collections.emptyList(), Collections.emptyMap(), true), @@ -212,7 +212,7 @@ public class AlignedSeriesAggregationScanOperatorTest { new Aggregator( AccumulatorFactory.createAccumulator( aggregationTypes.get(i), - dataType, + Collections.singletonList(dataType), Collections.emptyList(), Collections.emptyMap(), true), @@ -253,7 +253,7 @@ public class AlignedSeriesAggregationScanOperatorTest { new Aggregator( AccumulatorFactory.createAccumulator( aggregationTypes.get(i), - dataType, + Collections.singletonList(dataType), Collections.emptyList(), Collections.emptyMap(), false), @@ -287,7 +287,7 @@ public class AlignedSeriesAggregationScanOperatorTest { new Aggregator( AccumulatorFactory.createAccumulator( TAggregationType.COUNT, - dataType, + Collections.singletonList(dataType), Collections.emptyList(), Collections.emptyMap(), true), @@ -321,7 +321,7 @@ public class AlignedSeriesAggregationScanOperatorTest { new Aggregator( AccumulatorFactory.createAccumulator( TAggregationType.COUNT, - dataType, + Collections.singletonList(dataType), Collections.emptyList(), Collections.emptyMap(), true), @@ -354,7 +354,7 @@ public class AlignedSeriesAggregationScanOperatorTest { new Aggregator( AccumulatorFactory.createAccumulator( TAggregationType.COUNT, - dataType, + Collections.singletonList(dataType), Collections.emptyList(), Collections.emptyMap(), true), @@ -392,7 +392,7 @@ public class AlignedSeriesAggregationScanOperatorTest { new Aggregator( AccumulatorFactory.createAccumulator( aggregationTypes.get(i), - dataType, + Collections.singletonList(dataType), Collections.emptyList(), Collections.emptyMap(), true), @@ -431,7 +431,7 @@ public class AlignedSeriesAggregationScanOperatorTest { new Aggregator( AccumulatorFactory.createAccumulator( TAggregationType.COUNT, - dataType, + Collections.singletonList(dataType), Collections.emptyList(), Collections.emptyMap(), true), @@ -471,7 +471,7 @@ public class AlignedSeriesAggregationScanOperatorTest { new Aggregator( AccumulatorFactory.createAccumulator( TAggregationType.COUNT, - dataType, + Collections.singletonList(dataType), Collections.emptyList(), Collections.emptyMap(), true), diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/OperatorMemoryTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/OperatorMemoryTest.java index dbcf497d1fc..3f257c1a1bd 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/OperatorMemoryTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/OperatorMemoryTest.java @@ -1197,7 +1197,7 @@ public class OperatorMemoryTest { new Aggregator( AccumulatorFactory.createAccumulator( o.getAggregationType(), - measurementPath.getSeriesType(), + Collections.singletonList(measurementPath.getSeriesType()), Collections.emptyList(), Collections.emptyMap(), true), @@ -1252,7 +1252,7 @@ public class OperatorMemoryTest { new Aggregator( AccumulatorFactory.createAccumulator( o.getAggregationType(), - measurementPath.getSeriesType(), + Collections.singletonList(measurementPath.getSeriesType()), Collections.emptyList(), Collections.emptyMap(), true), @@ -1325,7 +1325,7 @@ public class OperatorMemoryTest { new Aggregator( AccumulatorFactory.createAccumulator( o.getAggregationType(), - measurementPath.getSeriesType(), + Collections.singletonList(measurementPath.getSeriesType()), Collections.emptyList(), Collections.emptyMap(), true), @@ -1405,7 +1405,7 @@ public class OperatorMemoryTest { new Aggregator( AccumulatorFactory.createAccumulator( o.getAggregationType(), - measurementPath.getSeriesType(), + Collections.singletonList(measurementPath.getSeriesType()), Collections.emptyList(), Collections.emptyMap(), true), diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/SlidingWindowAggregationOperatorTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/SlidingWindowAggregationOperatorTest.java index 299551c6281..2c03d5a43a6 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/SlidingWindowAggregationOperatorTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/SlidingWindowAggregationOperatorTest.java @@ -262,7 +262,7 @@ public class SlidingWindowAggregationOperatorTest { finalAggregators.add( SlidingWindowAggregatorFactory.createSlidingWindowAggregator( rootAggregationTypes.get(i), - TSDataType.INT32, + Collections.singletonList(TSDataType.INT32), Collections.emptyList(), Collections.emptyMap(), ascending, diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/BuiltinAggregationFunction.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/BuiltinAggregationFunction.java index 8a4f31baa70..c551b02bd8e 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/BuiltinAggregationFunction.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/BuiltinAggregationFunction.java @@ -44,7 +44,8 @@ public enum BuiltinAggregationFunction { STDDEV_SAMP("stddev_samp"), VARIANCE("variance"), VAR_POP("var_pop"), - VAR_SAMP("var_samp"); + VAR_SAMP("var_samp"), + MAX_BY("max_by"); private final String functionName; @@ -91,6 +92,7 @@ public enum BuiltinAggregationFunction { case "variance": case "var_pop": case "var_samp": + case "max_by": return false; default: throw new IllegalArgumentException("Invalid Aggregation function: " + name); @@ -121,6 +123,7 @@ public enum BuiltinAggregationFunction { case "variance": case "var_pop": case "var_samp": + case "max_by": return true; case "count_if": case "count_time": diff --git a/iotdb-protocol/thrift-commons/src/main/thrift/common.thrift b/iotdb-protocol/thrift-commons/src/main/thrift/common.thrift index dbdb8bd406f..5a6325cd008 100644 --- a/iotdb-protocol/thrift-commons/src/main/thrift/common.thrift +++ b/iotdb-protocol/thrift-commons/src/main/thrift/common.thrift @@ -196,5 +196,6 @@ enum TAggregationType { STDDEV_SAMP, VARIANCE, VAR_POP, - VAR_SAMP + VAR_SAMP, + MAX_BY } \ No newline at end of file
