This is an automated email from the ASF dual-hosted git repository.

lancelly pushed a commit to branch max_by
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 11a1b83e75ba6bc8bc2516be3c1a05b635031924
Author: lancelly <[email protected]>
AuthorDate: Thu Jan 18 22:43:23 2024 +0800

    Draft FE for multiInput aggregation
---
 .../constant/BuiltinAggregationFunctionEnum.java   |  1 +
 .../java/org/apache/iotdb/tool/ExportTsFile.java   |  3 +-
 .../apache/iotdb/jdbc/IoTDBDatabaseMetadata.java   |  1 +
 .../sql/factory/IoTDBDynamicTableFactory.java      |  1 +
 .../protocol/thrift/impl/ClientRPCServiceImpl.java |  3 +-
 .../execution/aggregation/AccumulatorFactory.java  | 43 +++++++++++++++++++++-
 .../execution/aggregation/Aggregator.java          | 17 +++++----
 .../execution/aggregation/MaxByAccumulator.java    |  4 ++
 .../SlidingWindowAggregatorFactory.java            | 10 ++---
 .../plan/analyze/ExpressionTypeAnalyzer.java       | 37 ++++++++++++++++++-
 .../cartesian/BindSchemaForExpressionVisitor.java  |  7 ++--
 .../ConcatExpressionWithSuffixPathsVisitor.java    | 23 ++++++------
 .../db/queryengine/plan/parser/ASTVisitor.java     |  1 +
 .../plan/planner/OperatorTreeGenerator.java        | 41 ++++++++++-----------
 .../plan/parameter/AggregationDescriptor.java      | 15 +++++++-
 .../CrossSeriesAggregationDescriptor.java          | 11 +++++-
 .../org/apache/iotdb/db/utils/SchemaUtils.java     |  4 ++
 .../apache/iotdb/db/utils/TypeInferenceUtils.java  |  3 ++
 .../iotdb/db/utils/constant/SqlConstant.java       |  3 ++
 .../execution/aggregation/AccumulatorTest.java     | 34 ++++++++---------
 .../AlignedSeriesAggregationScanOperatorTest.java  | 22 +++++------
 .../execution/operator/OperatorMemoryTest.java     |  8 ++--
 .../SlidingWindowAggregationOperatorTest.java      |  2 +-
 .../udf/builtin/BuiltinAggregationFunction.java    |  5 ++-
 .../thrift-commons/src/main/thrift/common.thrift   |  3 +-
 25 files changed, 209 insertions(+), 93 deletions(-)

diff --git 
a/integration-test/src/main/java/org/apache/iotdb/itbase/constant/BuiltinAggregationFunctionEnum.java
 
b/integration-test/src/main/java/org/apache/iotdb/itbase/constant/BuiltinAggregationFunctionEnum.java
index a5d73836715..b1524341812 100644
--- 
a/integration-test/src/main/java/org/apache/iotdb/itbase/constant/BuiltinAggregationFunctionEnum.java
+++ 
b/integration-test/src/main/java/org/apache/iotdb/itbase/constant/BuiltinAggregationFunctionEnum.java
@@ -41,6 +41,7 @@ public enum BuiltinAggregationFunctionEnum {
   COUNT("count"),
   AVG("avg"),
   SUM("sum"),
+  MAX_BY("max_by")
   ;
 
   private final String functionName;
diff --git 
a/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/ExportTsFile.java 
b/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/ExportTsFile.java
index 9c9329ead7a..4b69013b12d 100644
--- a/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/ExportTsFile.java
+++ b/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/ExportTsFile.java
@@ -178,7 +178,8 @@ public class ExportTsFile extends AbstractTsFileTool {
         || sqlLower.contains("stddev_samp(")
         || sqlLower.contains("variance(")
         || sqlLower.contains("var_pop(")
-        || sqlLower.contains("var_samp(")) {
+        || sqlLower.contains("var_samp(")
+        || sqlLower.contains("max_by(")) {
       IoTPrinter.println("The sql you entered is invalid, please don't use 
aggregate query.");
       System.exit(CODE_ERROR);
     }
diff --git 
a/iotdb-client/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBDatabaseMetadata.java
 
b/iotdb-client/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBDatabaseMetadata.java
index 1d77ab3d072..2d3f661c670 100644
--- 
a/iotdb-client/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBDatabaseMetadata.java
+++ 
b/iotdb-client/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBDatabaseMetadata.java
@@ -220,6 +220,7 @@ public class IoTDBDatabaseMetadata implements 
DatabaseMetaData {
       "LZMA2",
       "LATEST",
       "LIKE",
+      "MAX_BY",
       "METADATA",
       "MERGE",
       "MOVE",
diff --git 
a/iotdb-connector/flink-sql-iotdb-connector/src/main/java/org/apache/iotdb/flink/sql/factory/IoTDBDynamicTableFactory.java
 
b/iotdb-connector/flink-sql-iotdb-connector/src/main/java/org/apache/iotdb/flink/sql/factory/IoTDBDynamicTableFactory.java
index c32de37733e..f5305180e95 100644
--- 
a/iotdb-connector/flink-sql-iotdb-connector/src/main/java/org/apache/iotdb/flink/sql/factory/IoTDBDynamicTableFactory.java
+++ 
b/iotdb-connector/flink-sql-iotdb-connector/src/main/java/org/apache/iotdb/flink/sql/factory/IoTDBDynamicTableFactory.java
@@ -202,6 +202,7 @@ public class IoTDBDynamicTableFactory
             || sqlLower.contains("last_value(")
             || sqlLower.contains("max_time(")
             || sqlLower.contains("min_time(")
+            || sqlLower.contains("max_by(")
             || sqlLower.contains("stddev(")
             || sqlLower.contains("stddev_pop(")
             || sqlLower.contains("stddev_samp(")
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java
index 645159b75ae..8c21de33986 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java
@@ -669,7 +669,8 @@ public class ClientRPCServiceImpl implements 
IClientRPCServiceWithHandler {
 
     Aggregator aggregator =
         new Aggregator(
-            AccumulatorFactory.createAccumulator(aggregationType, dataType, 
null, null, true),
+            AccumulatorFactory.createAccumulator(
+                aggregationType, Collections.singletonList(dataType), null, 
null, true),
             AggregationStep.SINGLE,
             Collections.singletonList(new InputLocation[] {new 
InputLocation(0, 0)}));
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/AccumulatorFactory.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/AccumulatorFactory.java
index 40d0a2d9bcd..9c0b7ce7fcb 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/AccumulatorFactory.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/AccumulatorFactory.java
@@ -20,18 +20,34 @@
 package org.apache.iotdb.db.queryengine.execution.aggregation;
 
 import org.apache.iotdb.common.rpc.thrift.TAggregationType;
+import org.apache.iotdb.commons.utils.TestOnly;
 import org.apache.iotdb.db.queryengine.plan.expression.Expression;
 import 
org.apache.iotdb.db.queryengine.plan.expression.binary.CompareBinaryExpression;
 import org.apache.iotdb.db.queryengine.plan.expression.leaf.ConstantOperand;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 
+import static com.google.common.base.Preconditions.checkState;
+
 public class AccumulatorFactory {
 
   public static Accumulator createAccumulator(
+      TAggregationType aggregationType,
+      List<TSDataType> inputDataTypes,
+      List<Expression> inputExpressions,
+      Map<String, String> inputAttributes,
+      boolean ascending) {
+    return isMultiInputAggregation(aggregationType)
+        ? createAccumulatorWithMultiInput(aggregationType, inputDataTypes)
+        : createSingleInputAccumulator(
+            aggregationType, inputDataTypes.get(0), inputExpressions, 
inputAttributes, ascending);
+  }
+
+  private static Accumulator createSingleInputAccumulator(
       TAggregationType aggregationType,
       TSDataType tsDataType,
       List<Expression> inputExpressions,
@@ -106,6 +122,27 @@ public class AccumulatorFactory {
     }
   }
 
+  public static Accumulator createAccumulatorWithMultiInput(
+      TAggregationType aggregationType, List<TSDataType> inputDataTypes) {
+    switch (aggregationType) {
+      case MAX_BY:
+        checkState(inputDataTypes.size() == 2, "Wrong inputDataTypes size.");
+        return new MaxByAccumulator(inputDataTypes.get(0), 
inputDataTypes.get(1));
+      default:
+        throw new IllegalArgumentException("Invalid Aggregation function: " + 
aggregationType);
+    }
+  }
+
+  private static boolean isMultiInputAggregation(TAggregationType 
aggregationType) {
+    switch (aggregationType) {
+      case MAX_BY:
+        return true;
+      default:
+        return false;
+    }
+  }
+
+  @TestOnly
   public static List<Accumulator> createAccumulators(
       List<TAggregationType> aggregationTypes,
       TSDataType tsDataType,
@@ -116,7 +153,11 @@ public class AccumulatorFactory {
     for (TAggregationType aggregationType : aggregationTypes) {
       accumulators.add(
           createAccumulator(
-              aggregationType, tsDataType, inputExpressions, inputAttributes, 
ascending));
+              aggregationType,
+              Collections.singletonList(tsDataType),
+              inputExpressions,
+              inputAttributes,
+              ascending));
     }
     return accumulators;
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/Aggregator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/Aggregator.java
index f6f46c5dd0b..e99311b8d21 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/Aggregator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/Aggregator.java
@@ -68,17 +68,20 @@ public class Aggregator {
       checkArgument(
           step.isInputRaw(),
           "Step in SeriesAggregateScanOperator and RawDataAggregateOperator 
can only process raw input");
-      for (InputLocation[] inputLocations : inputLocationList) {
+      checkArgument(
+          inputLocationList.size() == 1,
+          "inputLocationList.size() in SeriesAggregateScanOperator and 
RawDataAggregateOperator can only be 1.");
+      Column[] timeAndValueColumn = new Column[1 + 
inputLocationList.get(0).length];
+      timeAndValueColumn[0] = tsBlock.getTimeColumn();
+      for (int i = 0; i < inputLocationList.get(0).length; i++) {
         checkArgument(
-            inputLocations[0].getTsBlockIndex() == 0,
+            inputLocationList.get(0)[i].getTsBlockIndex() == 0,
             "RawDataAggregateOperator can only process one tsBlock input.");
-        Column[] timeAndValueColumn = new Column[2];
-        timeAndValueColumn[0] = tsBlock.getTimeColumn();
-        int index = inputLocations[0].getValueColumnIndex();
+        int index = inputLocationList.get(0)[i].getValueColumnIndex();
         // for count_time, time column is also its value column
-        timeAndValueColumn[1] = index == -1 ? timeAndValueColumn[0] : 
tsBlock.getColumn(index);
-        accumulator.addInput(timeAndValueColumn, bitMap, lastIndex);
+        timeAndValueColumn[1 + i] = index == -1 ? timeAndValueColumn[0] : 
tsBlock.getColumn(index);
       }
+      accumulator.addInput(timeAndValueColumn, bitMap, lastIndex);
     } finally {
       QUERY_EXECUTION_METRICS.recordExecutionCost(
           AGGREGATION_FROM_RAW_DATA, System.nanoTime() - startTime);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/MaxByAccumulator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/MaxByAccumulator.java
index d393e8507cb..72a00c98b78 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/MaxByAccumulator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/MaxByAccumulator.java
@@ -223,6 +223,7 @@ public class MaxByAccumulator implements Accumulator {
   private void updateIntResult(int yMaxVal, Column xColumn, int xIndex) {
     if (!initResult || yMaxVal > yMaxValue.getInt()) {
       initResult = true;
+      yMaxValue.setInt(yMaxVal);
       updateX(xColumn, xIndex);
     }
   }
@@ -241,6 +242,7 @@ public class MaxByAccumulator implements Accumulator {
   private void updateLongResult(long yMaxVal, Column xColumn, int xIndex) {
     if (!initResult || yMaxVal > yMaxValue.getLong()) {
       initResult = true;
+      yMaxValue.setLong(yMaxVal);
       updateX(xColumn, xIndex);
     }
   }
@@ -259,6 +261,7 @@ public class MaxByAccumulator implements Accumulator {
   private void updateFloatResult(float yMaxVal, Column xColumn, int xIndex) {
     if (!initResult || yMaxVal > yMaxValue.getFloat()) {
       initResult = true;
+      yMaxValue.setFloat(yMaxVal);
       updateX(xColumn, xIndex);
     }
   }
@@ -277,6 +280,7 @@ public class MaxByAccumulator implements Accumulator {
   private void updateDoubleResult(double yMaxVal, Column xColumn, int xIndex) {
     if (!initResult || yMaxVal > yMaxValue.getDouble()) {
       initResult = true;
+      yMaxValue.setDouble(yMaxVal);
       updateX(xColumn, xIndex);
     }
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/slidingwindow/SlidingWindowAggregatorFactory.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/slidingwindow/SlidingWindowAggregatorFactory.java
index ac01208d36f..53e477be245 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/slidingwindow/SlidingWindowAggregatorFactory.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/slidingwindow/SlidingWindowAggregatorFactory.java
@@ -119,7 +119,7 @@ public class SlidingWindowAggregatorFactory {
 
   public static SlidingWindowAggregator createSlidingWindowAggregator(
       TAggregationType aggregationType,
-      TSDataType dataType,
+      List<TSDataType> dataTypes,
       List<Expression> inputExpressions,
       Map<String, String> inputAttributes,
       boolean ascending,
@@ -127,7 +127,7 @@ public class SlidingWindowAggregatorFactory {
       AggregationStep step) {
     Accumulator accumulator =
         AccumulatorFactory.createAccumulator(
-            aggregationType, dataType, inputExpressions, inputAttributes, 
ascending);
+            aggregationType, dataTypes, inputExpressions, inputAttributes, 
ascending);
     switch (aggregationType) {
       case SUM:
       case AVG:
@@ -142,13 +142,13 @@ public class SlidingWindowAggregatorFactory {
         return new SmoothQueueSlidingWindowAggregator(accumulator, 
inputLocationList, step);
       case MAX_VALUE:
         return new MonotonicQueueSlidingWindowAggregator(
-            accumulator, inputLocationList, step, 
maxComparators.get(dataType));
+            accumulator, inputLocationList, step, 
maxComparators.get(dataTypes.get(0)));
       case MIN_VALUE:
         return new MonotonicQueueSlidingWindowAggregator(
-            accumulator, inputLocationList, step, 
minComparators.get(dataType));
+            accumulator, inputLocationList, step, 
minComparators.get(dataTypes.get(0)));
       case EXTREME:
         return new MonotonicQueueSlidingWindowAggregator(
-            accumulator, inputLocationList, step, 
extremeComparators.get(dataType));
+            accumulator, inputLocationList, step, 
extremeComparators.get(dataTypes.get(0)));
       case MIN_TIME:
       case FIRST_VALUE:
         return !ascending
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/ExpressionTypeAnalyzer.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/ExpressionTypeAnalyzer.java
index 68a96b29bf8..8f4c1d6fe63 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/ExpressionTypeAnalyzer.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/ExpressionTypeAnalyzer.java
@@ -42,6 +42,7 @@ import 
org.apache.iotdb.db.queryengine.plan.expression.unary.RegularExpression;
 import 
org.apache.iotdb.db.queryengine.plan.expression.visitor.ExpressionVisitor;
 import 
org.apache.iotdb.db.queryengine.transformation.dag.udf.UDTFInformationInferrer;
 import org.apache.iotdb.db.utils.TypeInferenceUtils;
+import org.apache.iotdb.db.utils.constant.SqlConstant;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
 
@@ -284,7 +285,8 @@ public class ExpressionTypeAnalyzer {
             functionExpression,
             TypeInferenceUtils.getAggrDataType(
                 functionExpression.getFunctionName(),
-                expressionTypes.get(NodeRef.of(inputExpressions.get(0)))));
+                getInputExpressionTypeForAggregation(
+                    inputExpressions, functionExpression.getFunctionName())));
       }
       if (functionExpression.isBuiltInScalarFunction()) {
         return setExpressionType(
@@ -398,4 +400,37 @@ public class ExpressionTypeAnalyzer {
               expressionString, actual.name(), Arrays.toString(expected)));
     }
   }
+
+  private TSDataType getInputExpressionTypeForAggregation(
+      List<Expression> inputExpressions, String aggregateFunctionName) {
+    // Some aggregate functions have a fixed output type, while others 
determine their output type
+    // based on the data type of their input.
+    // Currently, for all aggregate functions without a fixed output type, the 
output type is
+    // determined by the first input.
+    switch (aggregateFunctionName) {
+      case SqlConstant.MIN_TIME:
+      case SqlConstant.MAX_TIME:
+      case SqlConstant.COUNT:
+      case SqlConstant.TIME_DURATION:
+      case SqlConstant.COUNT_TIME:
+      case SqlConstant.AVG:
+      case SqlConstant.SUM:
+      case SqlConstant.STDDEV:
+      case SqlConstant.STDDEV_POP:
+      case SqlConstant.STDDEV_SAMP:
+      case SqlConstant.VARIANCE:
+      case SqlConstant.VAR_POP:
+      case SqlConstant.VAR_SAMP:
+      case SqlConstant.LAST_VALUE:
+      case SqlConstant.FIRST_VALUE:
+      case SqlConstant.MIN_VALUE:
+      case SqlConstant.MAX_VALUE:
+      case SqlConstant.MODE:
+      case SqlConstant.MAX_BY:
+        return expressionTypes.get(NodeRef.of(inputExpressions.get(0)));
+      default:
+        throw new IllegalArgumentException(
+            "Invalid Aggregation function: " + aggregateFunctionName);
+    }
+  }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/expression/visitor/cartesian/BindSchemaForExpressionVisitor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/expression/visitor/cartesian/BindSchemaForExpressionVisitor.java
index eb036e7799b..2d618f6fa9f 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/expression/visitor/cartesian/BindSchemaForExpressionVisitor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/expression/visitor/cartesian/BindSchemaForExpressionVisitor.java
@@ -32,6 +32,7 @@ import 
org.apache.iotdb.db.queryengine.plan.expression.leaf.TimestampOperand;
 import 
org.apache.iotdb.db.queryengine.plan.expression.multi.FunctionExpression;
 import 
org.apache.iotdb.db.queryengine.plan.expression.visitor.CompleteMeasurementSchemaVisitor;
 import 
org.apache.iotdb.db.schemaengine.schemaregion.view.visitor.TransformToExpressionVisitor;
+import org.apache.iotdb.db.utils.constant.SqlConstant;
 import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
 
 import java.util.ArrayList;
@@ -79,11 +80,9 @@ public class BindSchemaForExpressionVisitor extends 
CartesianProductVisitor<ISch
       }
       extendedExpressions.add(actualExpressions);
 
-      // We just process first input Expression of AggregationFunction,
+      // We just process first input Expression of Count_IF,
       // keep other input Expressions as origin and bind Type
-      // If AggregationFunction need more than one input series,
-      // we need to reconsider the process of it
-      if (functionExpression.isBuiltInAggregationFunctionExpression()) {
+      if 
(SqlConstant.COUNT_IF.equalsIgnoreCase(functionExpression.getFunctionName())) {
         List<Expression> children = functionExpression.getExpressions();
         bindTypeForAggregationNonSeriesInputExpressions(
             functionExpression.getFunctionName(), children, 
extendedExpressions);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/expression/visitor/cartesian/ConcatExpressionWithSuffixPathsVisitor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/expression/visitor/cartesian/ConcatExpressionWithSuffixPathsVisitor.java
index 55ace89bc7d..2e623ba17e6 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/expression/visitor/cartesian/ConcatExpressionWithSuffixPathsVisitor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/expression/visitor/cartesian/ConcatExpressionWithSuffixPathsVisitor.java
@@ -43,21 +43,20 @@ public class ConcatExpressionWithSuffixPathsVisitor
   public List<Expression> visitFunctionExpression(
       FunctionExpression functionExpression, Context context) {
     List<List<Expression>> extendedExpressions = new ArrayList<>();
-    for (Expression suffixExpression : functionExpression.getExpressions()) {
-      extendedExpressions.add(process(suffixExpression, context));
-
-      // We just process first input Expression of AggregationFunction,
+    if 
(SqlConstant.COUNT_IF.equalsIgnoreCase(functionExpression.getFunctionName())) {
+      // We just process first input Expression of Count_IF,
       // keep other input Expressions as origin
-      // If AggregationFunction need more than one input series,
-      // we need to reconsider the process of it
-      if (functionExpression.isBuiltInAggregationFunctionExpression()) {
-        List<Expression> children = functionExpression.getExpressions();
-        for (int i = 1; i < children.size(); i++) {
-          extendedExpressions.add(Collections.singletonList(children.get(i)));
-        }
-        break;
+      
extendedExpressions.add(process(functionExpression.getExpressions().get(0), 
context));
+      List<Expression> children = functionExpression.getExpressions();
+      for (int i = 1; i < children.size(); i++) {
+        extendedExpressions.add(Collections.singletonList(children.get(i)));
       }
+    } else {
+      functionExpression
+          .getExpressions()
+          .forEach(expression -> extendedExpressions.add(process(expression, 
context)));
     }
+
     List<List<Expression>> childExpressionsList = new ArrayList<>();
     cartesianProduct(extendedExpressions, childExpressionsList, 0, new 
ArrayList<>());
     return reconstructFunctionExpressions(functionExpression, 
childExpressionsList);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/parser/ASTVisitor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/parser/ASTVisitor.java
index 2d31694eab6..57889ffdade 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/parser/ASTVisitor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/parser/ASTVisitor.java
@@ -2914,6 +2914,7 @@ public class ASTVisitor extends 
IoTDBSqlParserBaseVisitor<Statement> {
             1);
         return;
       case SqlConstant.COUNT_IF:
+      case SqlConstant.MAX_BY:
         checkFunctionExpressionInputSize(
             functionExpression.getExpressionString(),
             functionExpression.getExpressions().size(),
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/OperatorTreeGenerator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/OperatorTreeGenerator.java
index 094a543a916..5c9213376d1 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/OperatorTreeGenerator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/OperatorTreeGenerator.java
@@ -389,7 +389,7 @@ public class OperatorTreeGenerator extends 
PlanVisitor<Operator, LocalExecutionP
                 new Aggregator(
                     AccumulatorFactory.createAccumulator(
                         o.getAggregationType(),
-                        node.getSeriesPath().getSeriesType(),
+                        
Collections.singletonList(node.getSeriesPath().getSeriesType()),
                         o.getInputExpressions(),
                         o.getInputAttributes(),
                         ascending),
@@ -455,7 +455,7 @@ public class OperatorTreeGenerator extends 
PlanVisitor<Operator, LocalExecutionP
             new Aggregator(
                 AccumulatorFactory.createAccumulator(
                     descriptor.getAggregationType(),
-                    seriesDataType,
+                    Collections.singletonList(seriesDataType),
                     descriptor.getInputExpressions(),
                     descriptor.getInputAttributes(),
                     ascending),
@@ -467,7 +467,7 @@ public class OperatorTreeGenerator extends 
PlanVisitor<Operator, LocalExecutionP
             new Aggregator(
                 AccumulatorFactory.createAccumulator(
                     descriptor.getAggregationType(),
-                    TSDataType.INT64,
+                    Collections.singletonList(TSDataType.INT64),
                     descriptor.getInputExpressions(),
                     descriptor.getInputAttributes(),
                     ascending),
@@ -1477,16 +1477,15 @@ public class OperatorTreeGenerator extends 
PlanVisitor<Operator, LocalExecutionP
         node.getGroupByLevelDescriptors();
     for (CrossSeriesAggregationDescriptor descriptor : aggregationDescriptors) 
{
       List<InputLocation[]> inputLocationList = 
calcInputLocationList(descriptor, layout);
-      TSDataType seriesDataType =
-          context
-              .getTypeProvider()
-              // get the type of first inputExpression
-              
.getType(descriptor.getInputExpressions().get(0).getExpressionString());
+      List<TSDataType> inputDataTypes =
+          descriptor.getInputExpressions().stream()
+              .map(x -> 
context.getTypeProvider().getType(x.getExpressionString()))
+              .collect(Collectors.toList());
       aggregators.add(
           new Aggregator(
               AccumulatorFactory.createAccumulator(
                   descriptor.getAggregationType(),
-                  seriesDataType,
+                  inputDataTypes,
                   descriptor.getInputExpressions(),
                   descriptor.getInputAttributes(),
                   ascending),
@@ -1536,15 +1535,15 @@ public class OperatorTreeGenerator extends 
PlanVisitor<Operator, LocalExecutionP
           continue;
         }
         List<InputLocation[]> inputLocations = 
calcInputLocationList(aggregationDescriptor, layout);
-        TSDataType seriesDataType =
-            context
-                .getTypeProvider()
-                
.getType(aggregationDescriptor.getInputExpressions().get(0).getExpressionString());
+        List<TSDataType> inputDataTypes =
+            aggregationDescriptor.getInputExpressions().stream()
+                .map(x -> 
context.getTypeProvider().getType(x.getExpressionString()))
+                .collect(Collectors.toList());
         aggregators.add(
             new Aggregator(
                 AccumulatorFactory.createAccumulator(
                     aggregationDescriptor.getAggregationType(),
-                    seriesDataType,
+                    inputDataTypes,
                     aggregationDescriptor.getInputExpressions(),
                     aggregationDescriptor.getInputAttributes(),
                     ascending),
@@ -1601,10 +1600,9 @@ public class OperatorTreeGenerator extends 
PlanVisitor<Operator, LocalExecutionP
       aggregators.add(
           SlidingWindowAggregatorFactory.createSlidingWindowAggregator(
               descriptor.getAggregationType(),
-              context
-                  .getTypeProvider()
-                  // get the type of first inputExpression
-                  
.getType(descriptor.getInputExpressions().get(0).getExpressionString()),
+              descriptor.getInputExpressions().stream()
+                  .map(x -> 
context.getTypeProvider().getType(x.getExpressionString()))
+                  .collect(Collectors.toList()),
               descriptor.getInputExpressions(),
               descriptor.getInputAttributes(),
               ascending,
@@ -1674,10 +1672,9 @@ public class OperatorTreeGenerator extends 
PlanVisitor<Operator, LocalExecutionP
           new Aggregator(
               AccumulatorFactory.createAccumulator(
                   descriptor.getAggregationType(),
-                  context
-                      .getTypeProvider()
-                      // get the type of first inputExpression
-                      
.getType(descriptor.getInputExpressions().get(0).getExpressionString()),
+                  descriptor.getInputExpressions().stream()
+                      .map(x -> 
context.getTypeProvider().getType(x.getExpressionString()))
+                      .collect(Collectors.toList()),
                   descriptor.getInputExpressions(),
                   descriptor.getInputAttributes(),
                   ascending),
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/parameter/AggregationDescriptor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/parameter/AggregationDescriptor.java
index 190d3d3767a..a0166b0ba7a 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/parameter/AggregationDescriptor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/parameter/AggregationDescriptor.java
@@ -34,6 +34,7 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
+import java.util.stream.Collectors;
 
 public class AggregationDescriptor {
 
@@ -108,8 +109,13 @@ public class AggregationDescriptor {
 
   public List<List<String>> getInputColumnNamesList() {
     if (step.isInputRaw()) {
-      return Collections.singletonList(
-          
Collections.singletonList(inputExpressions.get(0).getExpressionString()));
+      List<String> inputColumnNames =
+          SqlConstant.COUNT_IF.equalsIgnoreCase(aggregationFuncName)
+              ? 
Collections.singletonList(inputExpressions.get(0).getExpressionString())
+              : inputExpressions.stream()
+                  .map(Expression::getExpressionString)
+                  .collect(Collectors.toList());
+      return Collections.singletonList(inputColumnNames);
     }
 
     return Collections.singletonList(getInputColumnNames());
@@ -163,6 +169,11 @@ public class AggregationDescriptor {
         case VAR_SAMP:
           outputAggregationNames.add(SqlConstant.VAR_SAMP);
           break;
+        case MAX_BY:
+          // max_by(x, y) takes x and y as the input
+          outputAggregationNames.add(SqlConstant.MAX_BY_INPUT_X);
+          outputAggregationNames.add(SqlConstant.MAX_BY_INPUT_Y);
+          break;
         default:
           outputAggregationNames.add(aggregationFuncName);
       }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/parameter/CrossSeriesAggregationDescriptor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/parameter/CrossSeriesAggregationDescriptor.java
index 524840fb48a..165df8d10c9 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/parameter/CrossSeriesAggregationDescriptor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/parameter/CrossSeriesAggregationDescriptor.java
@@ -20,6 +20,7 @@
 package org.apache.iotdb.db.queryengine.plan.planner.plan.parameter;
 
 import org.apache.iotdb.db.queryengine.plan.expression.Expression;
+import org.apache.iotdb.db.utils.constant.SqlConstant;
 import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
 
 import java.io.DataOutputStream;
@@ -30,6 +31,7 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
+import java.util.stream.Collectors;
 
 public class CrossSeriesAggregationDescriptor extends AggregationDescriptor {
 
@@ -108,8 +110,13 @@ public class CrossSeriesAggregationDescriptor extends 
AggregationDescriptor {
   @Override
   public List<List<String>> getInputColumnNamesList() {
     if (step.isInputRaw()) {
-      return Collections.singletonList(
-          
Collections.singletonList(inputExpressions.get(0).getExpressionString()));
+      List<String> inputColumnNames =
+          SqlConstant.COUNT_IF.equalsIgnoreCase(aggregationFuncName)
+              ? 
Collections.singletonList(inputExpressions.get(0).getExpressionString())
+              : inputExpressions.stream()
+                  .map(Expression::getExpressionString)
+                  .collect(Collectors.toList());
+      return Collections.singletonList(inputColumnNames);
     }
 
     List<List<String>> inputColumnNamesList = new ArrayList<>();
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/SchemaUtils.java 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/SchemaUtils.java
index 21a8e1070b3..b7ba465abf7 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/SchemaUtils.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/SchemaUtils.java
@@ -136,6 +136,7 @@ public class SchemaUtils {
       case SqlConstant.MIN_VALUE:
       case SqlConstant.MAX_VALUE:
       case SqlConstant.MODE:
+      case SqlConstant.MAX_BY:
       default:
         return null;
     }
@@ -169,6 +170,7 @@ public class SchemaUtils {
       case VARIANCE:
       case VAR_POP:
       case VAR_SAMP:
+      case MAX_BY:
         return true;
       default:
         throw new IllegalArgumentException(
@@ -202,6 +204,8 @@ public class SchemaUtils {
         return Collections.singletonList(TAggregationType.VAR_POP);
       case VAR_SAMP:
         return Collections.singletonList(TAggregationType.VAR_SAMP);
+      case MAX_BY:
+        return Collections.singletonList(TAggregationType.MAX_BY);
       case AVG:
         return Arrays.asList(TAggregationType.COUNT, TAggregationType.SUM);
       case TIME_DURATION:
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/TypeInferenceUtils.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/TypeInferenceUtils.java
index a214a27b353..93f975d0de4 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/TypeInferenceUtils.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/TypeInferenceUtils.java
@@ -142,6 +142,7 @@ public class TypeInferenceUtils {
       case SqlConstant.MAX_VALUE:
       case SqlConstant.EXTREME:
       case SqlConstant.MODE:
+      case SqlConstant.MAX_BY:
         return dataType;
       case SqlConstant.AVG:
       case SqlConstant.SUM:
@@ -187,6 +188,7 @@ public class TypeInferenceUtils {
       case SqlConstant.LAST_VALUE:
       case SqlConstant.TIME_DURATION:
       case SqlConstant.MODE:
+      case SqlConstant.MAX_BY:
         return;
       case SqlConstant.COUNT_IF:
         if (dataType != TSDataType.BOOLEAN) {
@@ -231,6 +233,7 @@ public class TypeInferenceUtils {
       case SqlConstant.VARIANCE:
       case SqlConstant.VAR_POP:
       case SqlConstant.VAR_SAMP:
+      case SqlConstant.MAX_BY:
         return;
       case SqlConstant.COUNT_IF:
         Expression keepExpression = inputExpressions.get(1);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/constant/SqlConstant.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/constant/SqlConstant.java
index 75d8a04aac1..f3e13d1cac1 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/constant/SqlConstant.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/constant/SqlConstant.java
@@ -47,6 +47,9 @@ public class SqlConstant {
   public static final String MAX_TIME = "max_time";
   public static final String MAX_VALUE = "max_value";
   public static final String MIN_VALUE = "min_value";
+  public static final String MAX_BY = "max_by";
+  public static final String MAX_BY_INPUT_X = "max_by_input_x";
+  public static final String MAX_BY_INPUT_Y = "max_by_input_y";
   public static final String EXTREME = "extreme";
   public static final String FIRST_VALUE = "first_value";
   public static final String LAST_VALUE = "last_value";
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/aggregation/AccumulatorTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/aggregation/AccumulatorTest.java
index 941a39dcb53..d0f92222888 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/aggregation/AccumulatorTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/aggregation/AccumulatorTest.java
@@ -81,7 +81,7 @@ public class AccumulatorTest {
     Accumulator avgAccumulator =
         AccumulatorFactory.createAccumulator(
             TAggregationType.AVG,
-            TSDataType.DOUBLE,
+            Collections.singletonList(TSDataType.DOUBLE),
             Collections.emptyList(),
             Collections.emptyMap(),
             true);
@@ -134,7 +134,7 @@ public class AccumulatorTest {
     Accumulator countAccumulator =
         AccumulatorFactory.createAccumulator(
             TAggregationType.COUNT,
-            TSDataType.DOUBLE,
+            Collections.singletonList(TSDataType.DOUBLE),
             Collections.emptyList(),
             Collections.emptyMap(),
             true);
@@ -180,7 +180,7 @@ public class AccumulatorTest {
     Accumulator countTimeAccumulator =
         AccumulatorFactory.createAccumulator(
             TAggregationType.COUNT_TIME,
-            TSDataType.DOUBLE,
+            Collections.singletonList(TSDataType.DOUBLE),
             Collections.emptyList(),
             Collections.emptyMap(),
             true);
@@ -223,7 +223,7 @@ public class AccumulatorTest {
     Accumulator extremeAccumulator =
         AccumulatorFactory.createAccumulator(
             TAggregationType.EXTREME,
-            TSDataType.DOUBLE,
+            Collections.singletonList(TSDataType.DOUBLE),
             Collections.emptyList(),
             Collections.emptyMap(),
             true);
@@ -263,7 +263,7 @@ public class AccumulatorTest {
     Accumulator firstValueAccumulator =
         AccumulatorFactory.createAccumulator(
             TAggregationType.FIRST_VALUE,
-            TSDataType.DOUBLE,
+            Collections.singletonList(TSDataType.DOUBLE),
             Collections.emptyList(),
             Collections.emptyMap(),
             true);
@@ -309,7 +309,7 @@ public class AccumulatorTest {
     Accumulator lastValueAccumulator =
         AccumulatorFactory.createAccumulator(
             TAggregationType.LAST_VALUE,
-            TSDataType.DOUBLE,
+            Collections.singletonList(TSDataType.DOUBLE),
             Collections.emptyList(),
             Collections.emptyMap(),
             true);
@@ -354,7 +354,7 @@ public class AccumulatorTest {
     Accumulator maxTimeAccumulator =
         AccumulatorFactory.createAccumulator(
             TAggregationType.MAX_TIME,
-            TSDataType.DOUBLE,
+            Collections.singletonList(TSDataType.DOUBLE),
             Collections.emptyList(),
             Collections.emptyMap(),
             true);
@@ -394,7 +394,7 @@ public class AccumulatorTest {
     Accumulator minTimeAccumulator =
         AccumulatorFactory.createAccumulator(
             TAggregationType.MIN_TIME,
-            TSDataType.DOUBLE,
+            Collections.singletonList(TSDataType.DOUBLE),
             Collections.emptyList(),
             Collections.emptyMap(),
             true);
@@ -434,7 +434,7 @@ public class AccumulatorTest {
     Accumulator extremeAccumulator =
         AccumulatorFactory.createAccumulator(
             TAggregationType.MAX_VALUE,
-            TSDataType.DOUBLE,
+            Collections.singletonList(TSDataType.DOUBLE),
             Collections.emptyList(),
             Collections.emptyMap(),
             true);
@@ -474,7 +474,7 @@ public class AccumulatorTest {
     Accumulator extremeAccumulator =
         AccumulatorFactory.createAccumulator(
             TAggregationType.MIN_VALUE,
-            TSDataType.DOUBLE,
+            Collections.singletonList(TSDataType.DOUBLE),
             Collections.emptyList(),
             Collections.emptyMap(),
             true);
@@ -514,7 +514,7 @@ public class AccumulatorTest {
     Accumulator sumAccumulator =
         AccumulatorFactory.createAccumulator(
             TAggregationType.SUM,
-            TSDataType.DOUBLE,
+            Collections.singletonList(TSDataType.DOUBLE),
             Collections.emptyList(),
             Collections.emptyMap(),
             true);
@@ -560,7 +560,7 @@ public class AccumulatorTest {
     Accumulator stddevAccumulator =
         AccumulatorFactory.createAccumulator(
             TAggregationType.STDDEV,
-            TSDataType.DOUBLE,
+            Collections.singletonList(TSDataType.DOUBLE),
             Collections.emptyList(),
             Collections.emptyMap(),
             true);
@@ -614,7 +614,7 @@ public class AccumulatorTest {
     Accumulator stddevPopAccumulator =
         AccumulatorFactory.createAccumulator(
             TAggregationType.STDDEV_POP,
-            TSDataType.DOUBLE,
+            Collections.singletonList(TSDataType.DOUBLE),
             Collections.emptyList(),
             Collections.emptyMap(),
             true);
@@ -661,7 +661,7 @@ public class AccumulatorTest {
     Accumulator stddevSampAccumulator =
         AccumulatorFactory.createAccumulator(
             TAggregationType.STDDEV_SAMP,
-            TSDataType.DOUBLE,
+            Collections.singletonList(TSDataType.DOUBLE),
             Collections.emptyList(),
             Collections.emptyMap(),
             true);
@@ -715,7 +715,7 @@ public class AccumulatorTest {
     Accumulator varianceAccumulator =
         AccumulatorFactory.createAccumulator(
             TAggregationType.VARIANCE,
-            TSDataType.DOUBLE,
+            Collections.singletonList(TSDataType.DOUBLE),
             Collections.emptyList(),
             Collections.emptyMap(),
             true);
@@ -769,7 +769,7 @@ public class AccumulatorTest {
     Accumulator varPopAccumulator =
         AccumulatorFactory.createAccumulator(
             TAggregationType.VAR_POP,
-            TSDataType.DOUBLE,
+            Collections.singletonList(TSDataType.DOUBLE),
             Collections.emptyList(),
             Collections.emptyMap(),
             true);
@@ -816,7 +816,7 @@ public class AccumulatorTest {
     Accumulator varSampAccumulator =
         AccumulatorFactory.createAccumulator(
             TAggregationType.VAR_SAMP,
-            TSDataType.DOUBLE,
+            Collections.singletonList(TSDataType.DOUBLE),
             Collections.emptyList(),
             Collections.emptyMap(),
             true);
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/AlignedSeriesAggregationScanOperatorTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/AlignedSeriesAggregationScanOperatorTest.java
index 0d48616266c..adb99a7d7d0 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/AlignedSeriesAggregationScanOperatorTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/AlignedSeriesAggregationScanOperatorTest.java
@@ -109,7 +109,7 @@ public class AlignedSeriesAggregationScanOperatorTest {
           new Aggregator(
               AccumulatorFactory.createAccumulator(
                   TAggregationType.COUNT,
-                  dataType,
+                  Collections.singletonList(dataType),
                   Collections.emptyList(),
                   Collections.emptyMap(),
                   true),
@@ -140,7 +140,7 @@ public class AlignedSeriesAggregationScanOperatorTest {
           new Aggregator(
               AccumulatorFactory.createAccumulator(
                   TAggregationType.COUNT,
-                  dataType,
+                  Collections.singletonList(dataType),
                   Collections.emptyList(),
                   Collections.emptyMap(),
                   false),
@@ -175,7 +175,7 @@ public class AlignedSeriesAggregationScanOperatorTest {
           new Aggregator(
               AccumulatorFactory.createAccumulator(
                   aggregationTypes.get(i),
-                  dataType,
+                  Collections.singletonList(dataType),
                   Collections.emptyList(),
                   Collections.emptyMap(),
                   true),
@@ -212,7 +212,7 @@ public class AlignedSeriesAggregationScanOperatorTest {
           new Aggregator(
               AccumulatorFactory.createAccumulator(
                   aggregationTypes.get(i),
-                  dataType,
+                  Collections.singletonList(dataType),
                   Collections.emptyList(),
                   Collections.emptyMap(),
                   true),
@@ -253,7 +253,7 @@ public class AlignedSeriesAggregationScanOperatorTest {
           new Aggregator(
               AccumulatorFactory.createAccumulator(
                   aggregationTypes.get(i),
-                  dataType,
+                  Collections.singletonList(dataType),
                   Collections.emptyList(),
                   Collections.emptyMap(),
                   false),
@@ -287,7 +287,7 @@ public class AlignedSeriesAggregationScanOperatorTest {
           new Aggregator(
               AccumulatorFactory.createAccumulator(
                   TAggregationType.COUNT,
-                  dataType,
+                  Collections.singletonList(dataType),
                   Collections.emptyList(),
                   Collections.emptyMap(),
                   true),
@@ -321,7 +321,7 @@ public class AlignedSeriesAggregationScanOperatorTest {
           new Aggregator(
               AccumulatorFactory.createAccumulator(
                   TAggregationType.COUNT,
-                  dataType,
+                  Collections.singletonList(dataType),
                   Collections.emptyList(),
                   Collections.emptyMap(),
                   true),
@@ -354,7 +354,7 @@ public class AlignedSeriesAggregationScanOperatorTest {
           new Aggregator(
               AccumulatorFactory.createAccumulator(
                   TAggregationType.COUNT,
-                  dataType,
+                  Collections.singletonList(dataType),
                   Collections.emptyList(),
                   Collections.emptyMap(),
                   true),
@@ -392,7 +392,7 @@ public class AlignedSeriesAggregationScanOperatorTest {
           new Aggregator(
               AccumulatorFactory.createAccumulator(
                   aggregationTypes.get(i),
-                  dataType,
+                  Collections.singletonList(dataType),
                   Collections.emptyList(),
                   Collections.emptyMap(),
                   true),
@@ -431,7 +431,7 @@ public class AlignedSeriesAggregationScanOperatorTest {
           new Aggregator(
               AccumulatorFactory.createAccumulator(
                   TAggregationType.COUNT,
-                  dataType,
+                  Collections.singletonList(dataType),
                   Collections.emptyList(),
                   Collections.emptyMap(),
                   true),
@@ -471,7 +471,7 @@ public class AlignedSeriesAggregationScanOperatorTest {
           new Aggregator(
               AccumulatorFactory.createAccumulator(
                   TAggregationType.COUNT,
-                  dataType,
+                  Collections.singletonList(dataType),
                   Collections.emptyList(),
                   Collections.emptyMap(),
                   true),
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/OperatorMemoryTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/OperatorMemoryTest.java
index dbcf497d1fc..3f257c1a1bd 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/OperatorMemoryTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/OperatorMemoryTest.java
@@ -1197,7 +1197,7 @@ public class OperatorMemoryTest {
                 new Aggregator(
                     AccumulatorFactory.createAccumulator(
                         o.getAggregationType(),
-                        measurementPath.getSeriesType(),
+                        
Collections.singletonList(measurementPath.getSeriesType()),
                         Collections.emptyList(),
                         Collections.emptyMap(),
                         true),
@@ -1252,7 +1252,7 @@ public class OperatorMemoryTest {
                 new Aggregator(
                     AccumulatorFactory.createAccumulator(
                         o.getAggregationType(),
-                        measurementPath.getSeriesType(),
+                        
Collections.singletonList(measurementPath.getSeriesType()),
                         Collections.emptyList(),
                         Collections.emptyMap(),
                         true),
@@ -1325,7 +1325,7 @@ public class OperatorMemoryTest {
                 new Aggregator(
                     AccumulatorFactory.createAccumulator(
                         o.getAggregationType(),
-                        measurementPath.getSeriesType(),
+                        
Collections.singletonList(measurementPath.getSeriesType()),
                         Collections.emptyList(),
                         Collections.emptyMap(),
                         true),
@@ -1405,7 +1405,7 @@ public class OperatorMemoryTest {
                 new Aggregator(
                     AccumulatorFactory.createAccumulator(
                         o.getAggregationType(),
-                        measurementPath.getSeriesType(),
+                        
Collections.singletonList(measurementPath.getSeriesType()),
                         Collections.emptyList(),
                         Collections.emptyMap(),
                         true),
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/SlidingWindowAggregationOperatorTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/SlidingWindowAggregationOperatorTest.java
index 299551c6281..2c03d5a43a6 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/SlidingWindowAggregationOperatorTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/SlidingWindowAggregationOperatorTest.java
@@ -262,7 +262,7 @@ public class SlidingWindowAggregationOperatorTest {
       finalAggregators.add(
           SlidingWindowAggregatorFactory.createSlidingWindowAggregator(
               rootAggregationTypes.get(i),
-              TSDataType.INT32,
+              Collections.singletonList(TSDataType.INT32),
               Collections.emptyList(),
               Collections.emptyMap(),
               ascending,
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/BuiltinAggregationFunction.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/BuiltinAggregationFunction.java
index 8a4f31baa70..c551b02bd8e 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/BuiltinAggregationFunction.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/BuiltinAggregationFunction.java
@@ -44,7 +44,8 @@ public enum BuiltinAggregationFunction {
   STDDEV_SAMP("stddev_samp"),
   VARIANCE("variance"),
   VAR_POP("var_pop"),
-  VAR_SAMP("var_samp");
+  VAR_SAMP("var_samp"),
+  MAX_BY("max_by");
 
   private final String functionName;
 
@@ -91,6 +92,7 @@ public enum BuiltinAggregationFunction {
       case "variance":
       case "var_pop":
       case "var_samp":
+      case "max_by":
         return false;
       default:
         throw new IllegalArgumentException("Invalid Aggregation function: " + 
name);
@@ -121,6 +123,7 @@ public enum BuiltinAggregationFunction {
       case "variance":
       case "var_pop":
       case "var_samp":
+      case "max_by":
         return true;
       case "count_if":
       case "count_time":
diff --git a/iotdb-protocol/thrift-commons/src/main/thrift/common.thrift 
b/iotdb-protocol/thrift-commons/src/main/thrift/common.thrift
index dbdb8bd406f..5a6325cd008 100644
--- a/iotdb-protocol/thrift-commons/src/main/thrift/common.thrift
+++ b/iotdb-protocol/thrift-commons/src/main/thrift/common.thrift
@@ -196,5 +196,6 @@ enum TAggregationType {
   STDDEV_SAMP,
   VARIANCE,
   VAR_POP,
-  VAR_SAMP
+  VAR_SAMP,
+  MAX_BY
 }
\ No newline at end of file

Reply via email to