This is an automated email from the ASF dual-hosted git repository.

lancelly pushed a commit to branch max_by
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit a0031a8eb83f9d080b5cfce2a57076aa40ac673c
Author: lancelly <[email protected]>
AuthorDate: Sat Jan 27 19:21:34 2024 +0800

    Support GroupByLevel and add related ITs
---
 .../it/aggregation/IoTDBAggregationByLevelIT.java  |  37 ++++----
 .../db/it/aggregation/IoTDBAggregationIT.java      |   5 +-
 .../maxby/IoTDBMaxByAlignedSeriesIT.java           |   8 ++
 .../db/it/aggregation/maxby/IoTDBMaxByIT.java      | 102 +++++++++++++++++----
 .../execution/aggregation/AccumulatorFactory.java  |   2 +-
 .../execution/aggregation/MaxByAccumulator.java    |  59 +++++++-----
 .../SlidingWindowAggregatorFactory.java            |  12 ++-
 .../ReplaceRawPathWithGroupedPathVisitor.java      |   7 +-
 .../cartesian/BindSchemaForPredicateVisitor.java   |   6 +-
 .../plan/planner/LogicalPlanBuilder.java           |  41 ++++++---
 .../plan/planner/OperatorTreeGenerator.java        |  10 +-
 .../plan/planner/distribution/SourceRewriter.java  |  17 ++--
 .../CrossSeriesAggregationDescriptor.java          |  67 ++++++++++----
 .../plan/analyze/AggregationDescriptorTest.java    |   8 +-
 .../plan/planner/LogicalPlannerTest.java           |   2 +-
 .../plan/planner/QueryLogicalPlanUtil.java         |  27 ++++--
 .../distribution/AggregationDistributionTest.java  |  29 ++++--
 .../node/process/GroupByLevelNodeSerdeTest.java    |   3 +-
 .../node/process/GroupByTagNodeSerdeTest.java      |  22 +++--
 19 files changed, 314 insertions(+), 150 deletions(-)

diff --git 
a/integration-test/src/test/java/org/apache/iotdb/db/it/aggregation/IoTDBAggregationByLevelIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/db/it/aggregation/IoTDBAggregationByLevelIT.java
index 4207e21c78f..e61e81b9ccb 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/db/it/aggregation/IoTDBAggregationByLevelIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/db/it/aggregation/IoTDBAggregationByLevelIT.java
@@ -283,37 +283,34 @@ public class IoTDBAggregationByLevelIT {
         }
       }
 
-
       try (ResultSet resultSet =
-                   statement.executeQuery(
-                           "select max_by(temperature) from root.sg1.* GROUP 
BY level=0")) {
+          statement.executeQuery("select max_by(temperature) from root.sg1.* 
GROUP BY level=0")) {
         while (resultSet.next()) {
           String ans =
-                  resultSet.getString(lastValue("root.*.d1.temperature"))
-                          + ","
-                          + 
resultSet.getString(lastValue("root.*.d2.temperature"))
-                          + ","
-                          + 
resultSet.getString(maxValue("root.*.d1.temperature"))
-                          + ","
-                          + 
resultSet.getString(maxValue("root.*.d2.temperature"));
+              resultSet.getString(lastValue("root.*.d1.temperature"))
+                  + ","
+                  + resultSet.getString(lastValue("root.*.d2.temperature"))
+                  + ","
+                  + resultSet.getString(maxValue("root.*.d1.temperature"))
+                  + ","
+                  + resultSet.getString(maxValue("root.*.d2.temperature"));
           Assert.assertEquals(retArray[cnt], ans);
           cnt++;
         }
       }
 
-
       try (ResultSet resultSet =
-                   statement.executeQuery(
-                           "select last_value(temperature), 
max_value(temperature) from root.sg1.* GROUP BY level=2")) {
+          statement.executeQuery(
+              "select last_value(temperature), max_value(temperature) from 
root.sg1.* GROUP BY level=2")) {
         while (resultSet.next()) {
           String ans =
-                  resultSet.getString(lastValue("root.*.d1.temperature"))
-                          + ","
-                          + 
resultSet.getString(lastValue("root.*.d2.temperature"))
-                          + ","
-                          + 
resultSet.getString(maxValue("root.*.d1.temperature"))
-                          + ","
-                          + 
resultSet.getString(maxValue("root.*.d2.temperature"));
+              resultSet.getString(lastValue("root.*.d1.temperature"))
+                  + ","
+                  + resultSet.getString(lastValue("root.*.d2.temperature"))
+                  + ","
+                  + resultSet.getString(maxValue("root.*.d1.temperature"))
+                  + ","
+                  + resultSet.getString(maxValue("root.*.d2.temperature"));
           Assert.assertEquals(retArray[cnt], ans);
           cnt++;
         }
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/db/it/aggregation/IoTDBAggregationIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/db/it/aggregation/IoTDBAggregationIT.java
index 1dd8fca19cd..3bf0ef7fc84 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/db/it/aggregation/IoTDBAggregationIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/db/it/aggregation/IoTDBAggregationIT.java
@@ -992,7 +992,7 @@ public class IoTDBAggregationIT {
       int cnt;
       try (ResultSet resultSet =
           statement.executeQuery(
-              "SELECT max_value(time, s0) "
+              "SELECT max_by(time, s0) "
                   + "FROM root.vehicle.d0 WHERE time >= 100 AND time < 9000")) 
{
         cnt = 0;
         while (resultSet.next()) {
@@ -1005,8 +1005,7 @@ public class IoTDBAggregationIT {
       }
 
       try (ResultSet resultSet =
-          statement.executeQuery(
-              "SELECT max_value(time,s0) FROM root.vehicle.d0 WHERE time < 
2500")) {
+          statement.executeQuery("SELECT max_by(time,s0) FROM root.vehicle.d0 
WHERE time < 2500")) {
         while (resultSet.next()) {
           String ans =
               resultSet.getString(TIMESTAMP_STR) + "," + 
resultSet.getString(maxBy("time", d0s0));
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/db/it/aggregation/maxby/IoTDBMaxByAlignedSeriesIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/db/it/aggregation/maxby/IoTDBMaxByAlignedSeriesIT.java
index 648b9e7b388..05af62d8552 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/db/it/aggregation/maxby/IoTDBMaxByAlignedSeriesIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/db/it/aggregation/maxby/IoTDBMaxByAlignedSeriesIT.java
@@ -40,6 +40,10 @@ public class IoTDBMaxByAlignedSeriesIT extends IoTDBMaxByIT {
         "INSERT INTO root.db.d1(timestamp,y1,y2,y3,y4,y5,y6) values(4, 4, 4, 
4, 4, false, \"4\")",
         "INSERT INTO root.db.d1(timestamp,x1,x2,x3,x4,x5,x6) values(8, 3, 3, 
3, 3, false, \"3\")",
         "INSERT INTO root.db.d1(timestamp,y1,y2,y3,y4,y5,y6) values(8, 8, 8, 
8, 8, false, \"4\")",
+        "INSERT INTO root.db.d1(timestamp,x1,x2,x3,x4,x5,x6) values(12, 3, 3, 
3, 3, false, \"3\")",
+        "INSERT INTO root.db.d1(timestamp,y1,y2,y3,y4,y5,y6) values(12, 9, 9, 
9, 9, false, \"4\")",
+        "INSERT INTO root.db.d1(timestamp,x1,x2,x3,x4,x5,x6) values(13, 4, 4, 
4, 4, false, \"4\")",
+        "INSERT INTO root.db.d1(timestamp,y1,y2,y3,y4,y5,y6) values(13, 9, 9, 
9, 9, false, \"4\")",
         "flush",
         // For Align By Device
         "CREATE ALIGNED TIMESERIES root.db.d2(x1 INT32, x2 INT64, x3 FLOAT, x4 
DOUBLE, x5 BOOLEAN, x6 TEXT)",
@@ -52,6 +56,10 @@ public class IoTDBMaxByAlignedSeriesIT extends IoTDBMaxByIT {
         "INSERT INTO root.db.d2(timestamp,y1,y2,y3,y4,y5,y6) values(4, 4, 4, 
4, 4, false, \"4\")",
         "INSERT INTO root.db.d2(timestamp,x1,x2,x3,x4,x5,x6) values(8, 3, 3, 
3, 3, false, \"3\")",
         "INSERT INTO root.db.d2(timestamp,y1,y2,y3,y4,y5,y6) values(8, 8, 8, 
8, 8, false, \"4\")",
+        "INSERT INTO root.db.d2(timestamp,x1,x2,x3,x4,x5,x6) values(12, 3, 3, 
3, 3, false, \"3\")",
+        "INSERT INTO root.db.d2(timestamp,y1,y2,y3,y4,y5,y6) values(12, 8, 8, 
8, 8, false, \"4\")",
+        "INSERT INTO root.db.d2(timestamp,x1,x2,x3,x4,x5,x6) values(13, 4, 4, 
4, 4, false, \"4\")",
+        "INSERT INTO root.db.d2(timestamp,y1,y2,y3,y4,y5,y6) values(13, 8, 8, 
8, 8, false, \"4\")",
       };
 
   @BeforeClass
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/db/it/aggregation/maxby/IoTDBMaxByIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/db/it/aggregation/maxby/IoTDBMaxByIT.java
index 29314a95f53..cfad1e26211 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/db/it/aggregation/maxby/IoTDBMaxByIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/db/it/aggregation/maxby/IoTDBMaxByIT.java
@@ -73,6 +73,10 @@ public class IoTDBMaxByIT {
         "INSERT INTO root.db.d1(timestamp,y1,y2,y3,y4,y5,y6) values(4, 4, 4, 
4, 4, false, \"4\")",
         "INSERT INTO root.db.d1(timestamp,x1,x2,x3,x4,x5,x6) values(8, 3, 3, 
3, 3, false, \"3\")",
         "INSERT INTO root.db.d1(timestamp,y1,y2,y3,y4,y5,y6) values(8, 8, 8, 
8, 8, false, \"4\")",
+        "INSERT INTO root.db.d1(timestamp,x1,x2,x3,x4,x5,x6) values(12, 3, 3, 
3, 3, false, \"3\")",
+        "INSERT INTO root.db.d1(timestamp,y1,y2,y3,y4,y5,y6) values(12, 9, 9, 
9, 9, false, \"4\")",
+        "INSERT INTO root.db.d1(timestamp,x1,x2,x3,x4,x5,x6) values(13, 4, 4, 
4, 4, false, \"4\")",
+        "INSERT INTO root.db.d1(timestamp,y1,y2,y3,y4,y5,y6) values(13, 9, 9, 
9, 9, false, \"4\")",
         "flush",
 
         // For Align By Device
@@ -96,6 +100,10 @@ public class IoTDBMaxByIT {
         "INSERT INTO root.db.d2(timestamp,y1,y2,y3,y4,y5,y6) values(2, 2, 2, 
2, 2, true, \"4\")",
         "INSERT INTO root.db.d2(timestamp,y1,y2,y3,y4,y5,y6) values(3, 3, 3, 
3, 3, false, \"3\")",
         "INSERT INTO root.db.d2(timestamp,y1,y2,y3,y4,y5,y6) values(4, 1, 1, 
1, 1, false, \"1\")",
+        "INSERT INTO root.db.d2(timestamp,x1,x2,x3,x4,x5,x6) values(12, 3, 3, 
3, 3, false, \"3\")",
+        "INSERT INTO root.db.d2(timestamp,y1,y2,y3,y4,y5,y6) values(12, 9, 9, 
9, 9, false, \"1\")",
+        "INSERT INTO root.db.d2(timestamp,x1,x2,x3,x4,x5,x6) values(13, 4, 4, 
4, 4, false, \"4\")",
+        "INSERT INTO root.db.d2(timestamp,y1,y2,y3,y4,y5,y6) values(13, 9, 9, 
9, 9, false, \"1\")",
         "flush"
       };
 
@@ -321,7 +329,7 @@ public class IoTDBMaxByIT {
   }
 
   @Test
-  public void testMaxByWithGroupBy() {
+  public void testMaxByWithGroupByTime() {
     try (Connection connection = EnvFactory.getEnv().getConnection();
         Statement statement = connection.createStatement()) {
       String[] expectedHeader =
@@ -334,17 +342,33 @@ public class IoTDBMaxByIT {
             "max_by(root.db.d1.x5, root.db.d1.y2)",
             "max_by(root.db.d1.x6, root.db.d1.y2)",
           };
-      String[] retArray =
+      String y = "y2";
+      // order by time ASC
+      String[] retArray1 =
           new String[] {
             "0,3,3,3.0,3.0,false,3,", "4,null,null,null,null,null,null,", 
"8,3,3,3.0,3.0,false,3,"
           };
-      String y = "y2";
       resultSetEqualTest(
           String.format(
-              "select 
max_by(x1,%s),max_by(x2,%s),max_by(x3,%s),max_by(x4,%s),max_by(x5,%s),max_by(x6,%s)
 from root.db.d1 group by ([0,9),4ms)",
+              "select 
max_by(x1,%s),max_by(x2,%s),max_by(x3,%s),max_by(x4,%s),max_by(x5,%s),max_by(x6,%s)
 from root.db.d1 where time <= 10 group by ([0,9),4ms) ",
               y, y, y, y, y, y),
           expectedHeader,
-          retArray);
+          retArray1);
+
+      // order by time DESC
+      String[] retArray2 =
+          new String[] {
+            "12,3,3,3.0,3.0,false,3,",
+            "8,3,3,3.0,3.0,false,3,",
+            "4,null,null,null,null,null,null,",
+            "0,3,3,3.0,3.0,false,3,"
+          };
+      resultSetEqualTest(
+          String.format(
+              "select 
max_by(x1,%s),max_by(x2,%s),max_by(x3,%s),max_by(x4,%s),max_by(x5,%s),max_by(x6,%s)
 from root.db.d1 group by ([0,13),4ms) order by time desc",
+              y, y, y, y, y, y),
+          expectedHeader,
+          retArray2);
     } catch (Exception e) {
       e.printStackTrace();
       fail(e.getMessage());
@@ -377,7 +401,7 @@ public class IoTDBMaxByIT {
             };
         resultSetEqualTest(
             String.format(
-                "select 
max_by(x1,%s),max_by(x2,%s),max_by(x3,%s),max_by(x4,%s),max_by(x5,%s),max_by(x6,%s)
 from root.db.d1 group by ([0,9),4ms,2ms)",
+                "select 
max_by(x1,%s),max_by(x2,%s),max_by(x3,%s),max_by(x4,%s),max_by(x5,%s),max_by(x6,%s)
 from root.db.d1 where time <= 10 group by ([0,9),4ms,2ms) ",
                 y, y, y, y, y, y),
             expectedHeader,
             retArray);
@@ -388,16 +412,62 @@ public class IoTDBMaxByIT {
     }
   }
 
-  // test max_by different types of x
-  // test max_by time
-  // test max_by with expression
-  // test max_by align by device
-  // test max_by group by time
-  // test max_by sliding window
-  // test max_by aligned series
-  // test max_by multi data region
-  // test max_by group by level
-  // test max_by having
+  @Test
+  public void testMaxByWithHaving() {
+    try (Connection connection = EnvFactory.getEnv().getConnection();
+        Statement statement = connection.createStatement()) {
+      String[] expectedHeader =
+          new String[] {
+            TIMESTAMP_STR,
+            "max_by(root.db.d1.x1, root.db.d1.y2)",
+            "max_by(root.db.d1.x2, root.db.d1.y2)",
+            "max_by(root.db.d1.x3, root.db.d1.y2)",
+            "max_by(root.db.d1.x4, root.db.d1.y2)",
+            "max_by(root.db.d1.x5, root.db.d1.y2)",
+            "max_by(root.db.d1.x6, root.db.d1.y2)",
+          };
+      String[] retArray = new String[] {"8,3,3,3.0,3.0,false,3,"};
+      String y = "y2";
+      resultSetEqualTest(
+          String.format(
+              "select 
max_by(x1,%s),max_by(x2,%s),max_by(x3,%s),max_by(x4,%s),max_by(x5,%s),max_by(x6,%s)
 from root.db.d1 group by ([0,9),4ms) having max_by(time, %s) > 4",
+              y, y, y, y, y, y, y),
+          expectedHeader,
+          retArray);
+    } catch (Exception e) {
+      e.printStackTrace();
+      fail(e.getMessage());
+    }
+  }
+
+  @Test
+  public void testMaxByWithGroupByLevel() {
+    try (Connection connection = EnvFactory.getEnv().getConnection();
+        Statement statement = connection.createStatement()) {
+      String[] retArray = new String[] {"3,3,3.0,3.0,false,3,"};
+      String[] yArray = new String[] {"y1", "y2", "y3", "y4"};
+      for (String y : yArray) {
+        String[] expectedHeader =
+            new String[] {
+              String.format("max_by(root.*.*.x1, root.*.*.%s)", y),
+              String.format("max_by(root.*.*.x2, root.*.*.%s)", y),
+              String.format("max_by(root.*.*.x3, root.*.*.%s)", y),
+              String.format("max_by(root.*.*.x4, root.*.*.%s)", y),
+              String.format("max_by(root.*.*.x5, root.*.*.%s)", y),
+              String.format("max_by(root.*.*.x6, root.*.*.%s)", y),
+            };
+        resultSetEqualTest(
+            String.format(
+                "select 
max_by(x1,%s),max_by(x2,%s),max_by(x3,%s),max_by(x4,%s),max_by(x5,%s),max_by(x6,%s)
 from root.db.** group by level = 0",
+                y, y, y, y, y, y),
+            expectedHeader,
+            retArray);
+      }
+    } catch (Exception e) {
+      e.printStackTrace();
+      fail(e.getMessage());
+    }
+  }
 
   /** @return yInput -> expectedHeader */
   private Map<String, String[]> generateExpectedHeadersForMaxByTest(
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/AccumulatorFactory.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/AccumulatorFactory.java
index 9c0b7ce7fcb..68f233ad8d4 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/AccumulatorFactory.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/AccumulatorFactory.java
@@ -133,7 +133,7 @@ public class AccumulatorFactory {
     }
   }
 
-  private static boolean isMultiInputAggregation(TAggregationType 
aggregationType) {
+  public static boolean isMultiInputAggregation(TAggregationType 
aggregationType) {
     switch (aggregationType) {
       case MAX_BY:
         return true;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/MaxByAccumulator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/MaxByAccumulator.java
index 922acc45aa3..7f101e982d4 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/MaxByAccumulator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/MaxByAccumulator.java
@@ -52,6 +52,8 @@ public class MaxByAccumulator implements Accumulator {
 
   private boolean initResult;
 
+  private long yTimeStamp = Long.MAX_VALUE;
+
   private static final String UNSUPPORTED_TYPE_MESSAGE = "Unsupported data 
type in MaxBy: %s";
 
   public MaxByAccumulator(TSDataType xDataType, TSDataType yDataType) {
@@ -138,6 +140,7 @@ public class MaxByAccumulator implements Accumulator {
     xNull = true;
     this.xResult.reset();
     this.yMaxValue.reset();
+    yTimeStamp = Long.MAX_VALUE;
   }
 
   @Override
@@ -161,14 +164,17 @@ public class MaxByAccumulator implements Accumulator {
         continue;
       }
       if (!column[2].isNull(i)) {
-        updateIntResult(column[2].getInt(i), column[1], i);
+        updateIntResult(column[0].getLong(i), column[2].getInt(i), column[1], 
i);
       }
     }
   }
 
-  private void updateIntResult(int yMaxVal, Column xColumn, int xIndex) {
-    if (!initResult || yMaxVal > yMaxValue.getInt()) {
+  private void updateIntResult(long time, int yMaxVal, Column xColumn, int 
xIndex) {
+    if (!initResult
+        || yMaxVal > yMaxValue.getInt()
+        || (yMaxVal == yMaxValue.getInt() && time < yTimeStamp)) {
       initResult = true;
+      yTimeStamp = time;
       yMaxValue.setInt(yMaxVal);
       updateX(xColumn, xIndex);
     }
@@ -180,14 +186,17 @@ public class MaxByAccumulator implements Accumulator {
         continue;
       }
       if (!column[2].isNull(i)) {
-        updateLongResult(column[2].getLong(i), column[1], i);
+        updateLongResult(column[0].getLong(i), column[2].getLong(i), 
column[1], i);
       }
     }
   }
 
-  private void updateLongResult(long yMaxVal, Column xColumn, int xIndex) {
-    if (!initResult || yMaxVal > yMaxValue.getLong()) {
+  private void updateLongResult(long time, long yMaxVal, Column xColumn, int 
xIndex) {
+    if (!initResult
+        || yMaxVal > yMaxValue.getLong()
+        || (yMaxVal == yMaxValue.getLong() && time < yTimeStamp)) {
       initResult = true;
+      yTimeStamp = time;
       yMaxValue.setLong(yMaxVal);
       updateX(xColumn, xIndex);
     }
@@ -199,14 +208,17 @@ public class MaxByAccumulator implements Accumulator {
         continue;
       }
       if (!column[2].isNull(i)) {
-        updateFloatResult(column[2].getFloat(i), column[1], i);
+        updateFloatResult(column[0].getLong(i), column[2].getFloat(i), 
column[1], i);
       }
     }
   }
 
-  private void updateFloatResult(float yMaxVal, Column xColumn, int xIndex) {
-    if (!initResult || yMaxVal > yMaxValue.getFloat()) {
+  private void updateFloatResult(long time, float yMaxVal, Column xColumn, int 
xIndex) {
+    if (!initResult
+        || yMaxVal > yMaxValue.getFloat()
+        || (yMaxVal == yMaxValue.getFloat() && time < yTimeStamp)) {
       initResult = true;
+      yTimeStamp = time;
       yMaxValue.setFloat(yMaxVal);
       updateX(xColumn, xIndex);
     }
@@ -218,14 +230,17 @@ public class MaxByAccumulator implements Accumulator {
         continue;
       }
       if (!column[2].isNull(i)) {
-        updateDoubleResult(column[2].getDouble(i), column[1], i);
+        updateDoubleResult(column[0].getLong(i), column[2].getDouble(i), 
column[1], i);
       }
     }
   }
 
-  private void updateDoubleResult(double yMaxVal, Column xColumn, int xIndex) {
-    if (!initResult || yMaxVal > yMaxValue.getDouble()) {
+  private void updateDoubleResult(long time, double yMaxVal, Column xColumn, 
int xIndex) {
+    if (!initResult
+        || yMaxVal > yMaxValue.getDouble()
+        || (yMaxVal == yMaxValue.getDouble() && time < yTimeStamp)) {
       initResult = true;
+      yTimeStamp = time;
       yMaxValue.setDouble(yMaxVal);
       updateX(xColumn, xIndex);
     }
@@ -295,6 +310,7 @@ public class MaxByAccumulator implements Accumulator {
     ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
     DataOutputStream dataOutputStream = new 
DataOutputStream(byteArrayOutputStream);
     try {
+      dataOutputStream.writeLong(yTimeStamp);
       writeIntermediateToStream(yDataType, yMaxValue, dataOutputStream);
       dataOutputStream.writeBoolean(xNull);
       if (!xNull) {
@@ -335,34 +351,35 @@ public class MaxByAccumulator implements Accumulator {
   }
 
   private void updateFromBytesIntermediateInput(byte[] bytes) {
-    int offset = 0;
+    long time = BytesUtils.bytesToLongFromOffset(bytes, Long.BYTES, 0);
+    int offset = Long.BYTES;
     // Use Column to store x value
     TsBlockBuilder builder = new 
TsBlockBuilder(Collections.singletonList(xDataType));
     ColumnBuilder columnBuilder = builder.getValueColumnBuilders()[0];
     switch (yDataType) {
       case INT32:
-        int intMaxVal = BytesUtils.bytesToInt(bytes, 0);
+        int intMaxVal = BytesUtils.bytesToInt(bytes, offset);
         offset += Integer.BYTES;
         readXFromBytesIntermediateInput(bytes, offset, columnBuilder);
-        updateIntResult(intMaxVal, columnBuilder.build(), 0);
+        updateIntResult(time, intMaxVal, columnBuilder.build(), 0);
         break;
       case INT64:
-        long longMaxVal = BytesUtils.bytesToLongFromOffset(bytes, Long.BYTES, 
0);
+        long longMaxVal = BytesUtils.bytesToLongFromOffset(bytes, Long.BYTES, 
offset);
         offset += Long.BYTES;
         readXFromBytesIntermediateInput(bytes, offset, columnBuilder);
-        updateLongResult(longMaxVal, columnBuilder.build(), 0);
+        updateLongResult(time, longMaxVal, columnBuilder.build(), 0);
         break;
       case FLOAT:
-        float floatMaxVal = BytesUtils.bytesToFloat(bytes, 0);
+        float floatMaxVal = BytesUtils.bytesToFloat(bytes, offset);
         offset += Float.BYTES;
         readXFromBytesIntermediateInput(bytes, offset, columnBuilder);
-        updateFloatResult(floatMaxVal, columnBuilder.build(), 0);
+        updateFloatResult(time, floatMaxVal, columnBuilder.build(), 0);
         break;
       case DOUBLE:
-        double doubleMaxVal = BytesUtils.bytesToDouble(bytes, 0);
+        double doubleMaxVal = BytesUtils.bytesToDouble(bytes, offset);
         offset += Long.BYTES;
         readXFromBytesIntermediateInput(bytes, offset, columnBuilder);
-        updateDoubleResult(doubleMaxVal, columnBuilder.build(), 0);
+        updateDoubleResult(time, doubleMaxVal, columnBuilder.build(), 0);
         break;
       case TEXT:
       case BOOLEAN:
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/slidingwindow/SlidingWindowAggregatorFactory.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/slidingwindow/SlidingWindowAggregatorFactory.java
index cfc6461d1bc..ce58e387cc9 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/slidingwindow/SlidingWindowAggregatorFactory.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/slidingwindow/SlidingWindowAggregatorFactory.java
@@ -122,17 +122,21 @@ public class SlidingWindowAggregatorFactory {
     // Intermediate Value of maxBy is a byte array: | y | xNull | x |
     maxByComparators.put(
         TSDataType.INT32,
-        Comparator.comparingInt(o -> 
BytesUtils.bytesToInt(o.getBinary(0).getValues(), 0)));
+        Comparator.comparingInt(
+            o -> BytesUtils.bytesToInt(o.getBinary(0).getValues(), 
Long.BYTES)));
     maxByComparators.put(
         TSDataType.INT64,
         Comparator.comparingLong(
-            o -> BytesUtils.bytesToLongFromOffset(o.getBinary(0).getValues(), 
Long.BYTES, 0)));
+            o ->
+                BytesUtils.bytesToLongFromOffset(
+                    o.getBinary(0).getValues(), Long.BYTES, Long.BYTES)));
     maxByComparators.put(
         TSDataType.FLOAT,
-        Comparator.comparing(o -> 
BytesUtils.bytesToFloat(o.getBinary(0).getValues(), 0)));
+        Comparator.comparing(o -> 
BytesUtils.bytesToFloat(o.getBinary(0).getValues(), Long.BYTES)));
     maxByComparators.put(
         TSDataType.DOUBLE,
-        Comparator.comparingDouble(o -> 
BytesUtils.bytesToDouble(o.getBinary(0).getValues(), 0)));
+        Comparator.comparingDouble(
+            o -> BytesUtils.bytesToDouble(o.getBinary(0).getValues(), 
Long.BYTES)));
   }
 
   public static SlidingWindowAggregator createSlidingWindowAggregator(
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/expression/visitor/ReplaceRawPathWithGroupedPathVisitor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/expression/visitor/ReplaceRawPathWithGroupedPathVisitor.java
index 1ec879a7f81..424e7287741 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/expression/visitor/ReplaceRawPathWithGroupedPathVisitor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/expression/visitor/ReplaceRawPathWithGroupedPathVisitor.java
@@ -26,6 +26,7 @@ import 
org.apache.iotdb.db.queryengine.plan.expression.leaf.ConstantOperand;
 import org.apache.iotdb.db.queryengine.plan.expression.leaf.TimeSeriesOperand;
 import org.apache.iotdb.db.queryengine.plan.expression.leaf.TimestampOperand;
 import 
org.apache.iotdb.db.queryengine.plan.expression.multi.FunctionExpression;
+import org.apache.iotdb.db.utils.constant.SqlConstant;
 
 import java.util.ArrayList;
 import java.util.List;
@@ -43,10 +44,8 @@ public class ReplaceRawPathWithGroupedPathVisitor
     for (Expression childExpression : functionExpression.getExpressions()) {
       childrenExpressions.add(process(childExpression, context));
 
-      // We just process first input Expression of AggregationFunction.
-      // If AggregationFunction need more than one input series,
-      // we need to reconsider the process of it
-      if (functionExpression.isBuiltInAggregationFunctionExpression()) {
+      // We just process first input Expression of Count_IF
+      if 
(SqlConstant.COUNT_IF.equalsIgnoreCase(functionExpression.getFunctionName())) {
         List<Expression> children = functionExpression.getExpressions();
         for (int i = 1; i < children.size(); i++) {
           childrenExpressions.add(children.get(i));
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/expression/visitor/cartesian/BindSchemaForPredicateVisitor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/expression/visitor/cartesian/BindSchemaForPredicateVisitor.java
index bca177d4505..968d6b51fd3 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/expression/visitor/cartesian/BindSchemaForPredicateVisitor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/expression/visitor/cartesian/BindSchemaForPredicateVisitor.java
@@ -88,11 +88,9 @@ public class BindSchemaForPredicateVisitor
               suffixExpression,
               new Context(context.getPrefixPaths(), context.getSchemaTree(), 
false)));
 
-      // We just process first input Expression of AggregationFunction,
+      // We just process first input Expression of Count_IF,
       // keep other input Expressions as origin and bind Type
-      // If AggregationFunction need more than one input series,
-      // we need to reconsider the process of it
-      if (predicate.isBuiltInAggregationFunctionExpression()) {
+      if (SqlConstant.COUNT_IF.equalsIgnoreCase(predicate.getFunctionName())) {
         List<Expression> children = predicate.getExpressions();
         bindTypeForAggregationNonSeriesInputExpressions(
             predicate.getFunctionName(), children, extendedExpressions);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanBuilder.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanBuilder.java
index 68dff31c74f..f6000913991 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanBuilder.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanBuilder.java
@@ -31,6 +31,7 @@ import org.apache.iotdb.commons.path.PathPatternTree;
 import org.apache.iotdb.commons.schema.filter.SchemaFilter;
 import org.apache.iotdb.db.queryengine.common.MPPQueryContext;
 import org.apache.iotdb.db.queryengine.common.header.ColumnHeaderConstant;
+import 
org.apache.iotdb.db.queryengine.execution.aggregation.AccumulatorFactory;
 import org.apache.iotdb.db.queryengine.plan.analyze.Analysis;
 import org.apache.iotdb.db.queryengine.plan.analyze.ExpressionAnalyzer;
 import org.apache.iotdb.db.queryengine.plan.analyze.TypeProvider;
@@ -701,12 +702,8 @@ public class LogicalPlanBuilder {
     List<String> partialAggregationsNames =
         
SchemaUtils.splitPartialAggregation(aggregationDescriptor.getAggregationType());
     String inputExpressionStr = 
getInputExpressionString(aggregationDescriptor);
-    for (String partialAggregationName : partialAggregationsNames) {
-      TSDataType aggregationType = 
SchemaUtils.getAggregationType(partialAggregationName);
-      typeProvider.setType(
-          String.format("%s(%s)", partialAggregationName, inputExpressionStr),
-          aggregationType == null ? typeProvider.getType(inputExpressionStr) : 
aggregationType);
-    }
+    partialAggregationsNames.forEach(
+        x -> setTypeForPartialAggregation(typeProvider, x, 
inputExpressionStr));
   }
 
   private static String getInputExpressionString(AggregationDescriptor 
aggregationDescriptor) {
@@ -718,15 +715,30 @@ public class LogicalPlanBuilder {
     }
   }
 
+  private static void setTypeForPartialAggregation(
+      TypeProvider typeProvider, String partialAggregationName, String 
inputExpressionStr) {
+    TSDataType aggregationType = 
SchemaUtils.getAggregationType(partialAggregationName);
+    typeProvider.setType(
+        String.format("%s(%s)", partialAggregationName, inputExpressionStr),
+        aggregationType == null ? typeProvider.getType(inputExpressionStr) : 
aggregationType);
+  }
+
   public static void updateTypeProviderByPartialAggregation(
       CrossSeriesAggregationDescriptor aggregationDescriptor, TypeProvider 
typeProvider) {
     List<String> partialAggregationsNames =
         
SchemaUtils.splitPartialAggregation(aggregationDescriptor.getAggregationType());
-    PartialPath path = ((TimeSeriesOperand) 
aggregationDescriptor.getOutputExpression()).getPath();
-    for (String partialAggregationName : partialAggregationsNames) {
-      typeProvider.setType(
-          String.format("%s(%s)", partialAggregationName, path.getFullPath()),
-          SchemaUtils.getSeriesTypeByPath(path, partialAggregationName));
+    if 
(!AccumulatorFactory.isMultiInputAggregation(aggregationDescriptor.getAggregationType()))
 {
+      PartialPath path =
+          ((TimeSeriesOperand) 
aggregationDescriptor.getOutputExpressions().get(0)).getPath();
+      for (String partialAggregationName : partialAggregationsNames) {
+        typeProvider.setType(
+            String.format("%s(%s)", partialAggregationName, 
path.getFullPath()),
+            SchemaUtils.getSeriesTypeByPath(path, partialAggregationName));
+      }
+    } else {
+      String inputExpressionStr = 
aggregationDescriptor.getOutputExpressionsAsBuilder().toString();
+      partialAggregationsNames.forEach(
+          x -> setTypeForPartialAggregation(typeProvider, x, 
inputExpressionStr));
     }
   }
 
@@ -1027,12 +1039,13 @@ public class LogicalPlanBuilder {
                   .collect(Collectors.toList()),
               entry.getValue().size(),
               ((FunctionExpression) entry.getKey()).getFunctionAttributes(),
-              entry.getKey().getExpressions().get(0)));
+              entry.getKey().getExpressions()));
     }
     updateTypeProvider(groupByLevelExpressions.keySet());
     updateTypeProvider(
         groupByLevelDescriptors.stream()
-            .map(CrossSeriesAggregationDescriptor::getOutputExpression)
+            .map(CrossSeriesAggregationDescriptor::getOutputExpressions)
+            .flatMap(Collection::stream)
             .collect(Collectors.toList()));
     return new GroupByLevelNode(
         context.getQueryId().genPlanNodeId(),
@@ -1070,7 +1083,7 @@ public class LogicalPlanBuilder {
                     curStep,
                     groupedTimeseriesOperands.get(expression),
                     ((FunctionExpression) expression).getFunctionAttributes(),
-                    expression.getExpressions().get(0));
+                    expression.getExpressions());
             aggregationDescriptors.add(aggregationDescriptor);
             added = true;
             break;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/OperatorTreeGenerator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/OperatorTreeGenerator.java
index 7cee314068f..081edad29fa 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/OperatorTreeGenerator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/OperatorTreeGenerator.java
@@ -265,6 +265,7 @@ import java.util.Map;
 import java.util.Objects;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.stream.Collectors;
+import java.util.stream.IntStream;
 
 import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkState;
@@ -1477,9 +1478,14 @@ public class OperatorTreeGenerator extends 
PlanVisitor<Operator, LocalExecutionP
         node.getGroupByLevelDescriptors();
     for (CrossSeriesAggregationDescriptor descriptor : aggregationDescriptors) 
{
       List<InputLocation[]> inputLocationList = 
calcInputLocationList(descriptor, layout);
+      // Use the first set of InputExpression
       List<TSDataType> inputDataTypes =
-          descriptor.getInputExpressions().stream()
-              .map(x -> 
context.getTypeProvider().getType(x.getExpressionString()))
+          IntStream.range(0, descriptor.getExpressionNumOfOneInput())
+              .mapToObj(
+                  x ->
+                      context
+                          .getTypeProvider()
+                          
.getType(descriptor.getInputExpressions().get(x).getExpressionString()))
               .collect(Collectors.toList());
       aggregators.add(
           new Aggregator(
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/SourceRewriter.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/SourceRewriter.java
index 4cf27f21fb9..f1268ec2db8 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/SourceRewriter.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/SourceRewriter.java
@@ -1117,9 +1117,9 @@ public class SourceRewriter extends 
SimplePlanNodeRewriter<DistributionPlanConte
       for (Expression exp : originalDescriptor.getInputExpressions()) {
         columnNameToExpression.put(exp.getExpressionString(), exp);
       }
-      columnNameToExpression.put(
-          originalDescriptor.getOutputExpression().getExpressionString(),
-          originalDescriptor.getOutputExpression());
+      originalDescriptor
+          .getOutputExpressions()
+          .forEach(x -> columnNameToExpression.put(x.getExpressionString(), 
x));
     }
 
     context.setColumnNameToExpression(columnNameToExpression);
@@ -1285,9 +1285,14 @@ public class SourceRewriter extends 
SimplePlanNodeRewriter<DistributionPlanConte
           handle.getGroupByLevelDescriptors()) {
         Set<Expression> descriptorExpressions = new HashSet<>();
 
-        if 
(childrenExpressionSet.contains(originalDescriptor.getOutputExpression())) {
-          descriptorExpressions.add(originalDescriptor.getOutputExpression());
-        }
+        originalDescriptor
+            .getOutputExpressions()
+            .forEach(
+                x -> {
+                  if (childrenExpressionSet.contains(x)) {
+                    descriptorExpressions.add(x);
+                  }
+                });
 
         for (Expression exp : originalDescriptor.getInputExpressions()) {
           if (childrenExpressionSet.contains(exp)) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/parameter/CrossSeriesAggregationDescriptor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/parameter/CrossSeriesAggregationDescriptor.java
index f03285e599a..ac33b3ee2ad 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/parameter/CrossSeriesAggregationDescriptor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/parameter/CrossSeriesAggregationDescriptor.java
@@ -19,6 +19,7 @@
 
 package org.apache.iotdb.db.queryengine.plan.planner.plan.parameter;
 
+import org.apache.iotdb.common.rpc.thrift.TAggregationType;
 import org.apache.iotdb.db.queryengine.plan.expression.Expression;
 import org.apache.iotdb.db.utils.constant.SqlConstant;
 import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
@@ -35,7 +36,7 @@ import java.util.stream.Collectors;
 
 public class CrossSeriesAggregationDescriptor extends AggregationDescriptor {
 
-  private final Expression outputExpression;
+  private final List<Expression> outputExpressions;
 
   /**
    * Records how many Expressions are in one input, used for calculation of 
inputColumnNames
@@ -51,9 +52,9 @@ public class CrossSeriesAggregationDescriptor extends 
AggregationDescriptor {
       List<Expression> inputExpressions,
       int numberOfInput,
       Map<String, String> inputAttributes,
-      Expression outputExpression) {
+      List<Expression> outputExpressions) {
     super(aggregationFuncName, step, inputExpressions, inputAttributes);
-    this.outputExpression = outputExpression;
+    this.outputExpressions = outputExpressions;
     this.expressionNumOfOneInput = inputExpressions.size() / numberOfInput;
   }
 
@@ -66,23 +67,23 @@ public class CrossSeriesAggregationDescriptor extends 
AggregationDescriptor {
       AggregationStep step,
       List<Expression> inputExpressions,
       Map<String, String> inputAttributes,
-      Expression outputExpression) {
+      List<Expression> outputExpressions) {
     super(aggregationFuncName, step, inputExpressions, inputAttributes);
-    this.outputExpression = outputExpression;
+    this.outputExpressions = outputExpressions;
     this.expressionNumOfOneInput = 1;
   }
 
   public CrossSeriesAggregationDescriptor(
       AggregationDescriptor aggregationDescriptor,
-      Expression outputExpression,
+      List<Expression> outputExpressions,
       int expressionNumOfOneInput) {
     super(aggregationDescriptor);
-    this.outputExpression = outputExpression;
+    this.outputExpressions = outputExpressions;
     this.expressionNumOfOneInput = expressionNumOfOneInput;
   }
 
-  public Expression getOutputExpression() {
-    return outputExpression;
+  public List<Expression> getOutputExpressions() {
+    return outputExpressions;
   }
 
   /**
@@ -97,11 +98,16 @@ public class CrossSeriesAggregationDescriptor extends 
AggregationDescriptor {
   @Override
   public String getParametersString() {
     if (parametersString == null) {
-      StringBuilder builder = new 
StringBuilder(outputExpression.getExpressionString());
-      for (int i = 1; i < expressionNumOfOneInput; i++) {
-        builder.append(", 
").append(inputExpressions.get(i).getExpressionString());
+      StringBuilder builder;
+      if (TAggregationType.COUNT_IF.equals(aggregationType)) {
+        builder = new 
StringBuilder(outputExpressions.get(0).getExpressionString());
+        for (int i = 1; i < expressionNumOfOneInput; i++) {
+          builder.append(", 
").append(inputExpressions.get(i).getExpressionString());
+        }
+        appendAttributes(builder);
+      } else {
+        builder = getOutputExpressionsAsBuilder();
       }
-      appendAttributes(builder);
       parametersString = builder.toString();
     }
     return parametersString;
@@ -151,31 +157,48 @@ public class CrossSeriesAggregationDescriptor extends 
AggregationDescriptor {
     return builder.toString();
   }
 
+  public StringBuilder getOutputExpressionsAsBuilder() {
+    StringBuilder builder = new 
StringBuilder(outputExpressions.get(0).getExpressionString());
+    for (int i = 1; i < outputExpressions.size(); i++) {
+      builder.append(", 
").append(outputExpressions.get(i).getExpressionString());
+    }
+    appendAttributes(builder);
+    return builder;
+  }
+
   @Override
   public CrossSeriesAggregationDescriptor deepClone() {
-    return new CrossSeriesAggregationDescriptor(this, outputExpression, 
expressionNumOfOneInput);
+    return new CrossSeriesAggregationDescriptor(this, outputExpressions, 
expressionNumOfOneInput);
   }
 
   @Override
   public void serialize(ByteBuffer byteBuffer) {
     super.serialize(byteBuffer);
-    Expression.serialize(outputExpression, byteBuffer);
+    ReadWriteIOUtils.write(outputExpressions.size(), byteBuffer);
+    outputExpressions.forEach(x -> Expression.serialize(x, byteBuffer));
     ReadWriteIOUtils.write(expressionNumOfOneInput, byteBuffer);
   }
 
   @Override
   public void serialize(DataOutputStream stream) throws IOException {
     super.serialize(stream);
-    Expression.serialize(outputExpression, stream);
+    ReadWriteIOUtils.write(outputExpressions.size(), stream);
+    for (Expression x : outputExpressions) {
+      Expression.serialize(x, stream);
+    }
     ReadWriteIOUtils.write(expressionNumOfOneInput, stream);
   }
 
   public static CrossSeriesAggregationDescriptor deserialize(ByteBuffer 
byteBuffer) {
     AggregationDescriptor aggregationDescriptor = 
AggregationDescriptor.deserialize(byteBuffer);
-    Expression outputExpression = Expression.deserialize(byteBuffer);
+    int size = ReadWriteIOUtils.readInt(byteBuffer);
+    List<Expression> outputExpressions = new ArrayList<>(size);
+    for (int i = 0; i < size; i++) {
+      outputExpressions.add(Expression.deserialize(byteBuffer));
+    }
     int expressionNumOfOneInput = ReadWriteIOUtils.readInt(byteBuffer);
     return new CrossSeriesAggregationDescriptor(
-        aggregationDescriptor, outputExpression, expressionNumOfOneInput);
+        aggregationDescriptor, outputExpressions, expressionNumOfOneInput);
   }
 
   @Override
@@ -190,11 +213,15 @@ public class CrossSeriesAggregationDescriptor extends 
AggregationDescriptor {
       return false;
     }
     CrossSeriesAggregationDescriptor that = (CrossSeriesAggregationDescriptor) 
o;
-    return Objects.equals(outputExpression, that.outputExpression);
+    return Objects.equals(outputExpressions, that.outputExpressions);
   }
 
   @Override
   public int hashCode() {
-    return Objects.hash(super.hashCode(), outputExpression);
+    return Objects.hash(super.hashCode(), outputExpressions);
+  }
+
+  public int getExpressionNumOfOneInput() {
+    return expressionNumOfOneInput;
   }
 }
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/analyze/AggregationDescriptorTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/analyze/AggregationDescriptorTest.java
index e109445da20..2e4c2daf9d7 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/analyze/AggregationDescriptorTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/analyze/AggregationDescriptorTest.java
@@ -99,7 +99,7 @@ public class AggregationDescriptorTest {
                 new TimeSeriesOperand(pathMap.get("root.sg.d1.s1"))),
             2,
             Collections.emptyMap(),
-            new TimeSeriesOperand(pathMap.get("root.sg.*.s1"))));
+            Collections.singletonList(new 
TimeSeriesOperand(pathMap.get("root.sg.*.s1")))));
     groupByLevelDescriptorList.add(
         new CrossSeriesAggregationDescriptor(
             TAggregationType.AVG.name().toLowerCase(),
@@ -109,7 +109,7 @@ public class AggregationDescriptorTest {
                 new TimeSeriesOperand(pathMap.get("root.sg.d2.s1"))),
             2,
             Collections.emptyMap(),
-            new TimeSeriesOperand(pathMap.get("root.sg.*.s1"))));
+            Collections.singletonList(new 
TimeSeriesOperand(pathMap.get("root.sg.*.s1")))));
     groupByLevelDescriptorList.add(
         new CrossSeriesAggregationDescriptor(
             TAggregationType.COUNT.name().toLowerCase(),
@@ -119,7 +119,7 @@ public class AggregationDescriptorTest {
                 new TimeSeriesOperand(pathMap.get("root.sg.d1.s1"))),
             2,
             Collections.emptyMap(),
-            new TimeSeriesOperand(pathMap.get("root.sg.*.s1"))));
+            Collections.singletonList(new 
TimeSeriesOperand(pathMap.get("root.sg.*.s1")))));
     groupByLevelDescriptorList.add(
         new CrossSeriesAggregationDescriptor(
             TAggregationType.AVG.name().toLowerCase(),
@@ -129,7 +129,7 @@ public class AggregationDescriptorTest {
                 new TimeSeriesOperand(pathMap.get("root.sg.d2.s1"))),
             2,
             Collections.emptyMap(),
-            new TimeSeriesOperand(pathMap.get("root.sg.*.s1"))));
+            Collections.singletonList(new 
TimeSeriesOperand(pathMap.get("root.sg.*.s1")))));
   }
 
   @Test
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlannerTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlannerTest.java
index ca227932c71..b60372636ec 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlannerTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlannerTest.java
@@ -751,7 +751,7 @@ public class LogicalPlannerTest {
           
tagValuesToAggregationDescriptors.get(Collections.singletonList("value1"));
       Assert.assertEquals(1, descriptors.size());
       CrossSeriesAggregationDescriptor descriptor = descriptors.get(0);
-      Assert.assertEquals("s1", descriptor.getOutputExpression().toString());
+      Assert.assertEquals("s1", 
descriptor.getOutputExpressions().get(0).toString());
       Assert.assertEquals(TAggregationType.MAX_VALUE, 
descriptor.getAggregationType());
       Assert.assertEquals(AggregationStep.FINAL, descriptor.getStep());
       Assert.assertEquals(3, descriptor.getInputExpressions().size());
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/QueryLogicalPlanUtil.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/QueryLogicalPlanUtil.java
index 07d89735810..5bd54736c73 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/QueryLogicalPlanUtil.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/QueryLogicalPlanUtil.java
@@ -584,7 +584,8 @@ public class QueryLogicalPlanUtil {
                         new TimeSeriesOperand(schemaMap.get("root.sg.d1.s1"))),
                     2,
                     Collections.emptyMap(),
-                    new TimeSeriesOperand(schemaMap.get("root.sg.*.s1"))),
+                    Collections.singletonList(
+                        new TimeSeriesOperand(schemaMap.get("root.sg.*.s1")))),
                 new CrossSeriesAggregationDescriptor(
                     TAggregationType.COUNT.name().toLowerCase(),
                     AggregationStep.FINAL,
@@ -592,7 +593,8 @@ public class QueryLogicalPlanUtil {
                         new 
TimeSeriesOperand(schemaMap.get("root.sg.d2.a.s1"))),
                     1,
                     Collections.emptyMap(),
-                    new TimeSeriesOperand(schemaMap.get("root.sg.*.*.s1"))),
+                    Collections.singletonList(
+                        new 
TimeSeriesOperand(schemaMap.get("root.sg.*.*.s1")))),
                 new CrossSeriesAggregationDescriptor(
                     TAggregationType.MAX_VALUE.name().toLowerCase(),
                     AggregationStep.FINAL,
@@ -601,7 +603,8 @@ public class QueryLogicalPlanUtil {
                         new TimeSeriesOperand(schemaMap.get("root.sg.d2.s2"))),
                     2,
                     Collections.emptyMap(),
-                    new TimeSeriesOperand(schemaMap.get("root.sg.*.s2"))),
+                    Collections.singletonList(
+                        new TimeSeriesOperand(schemaMap.get("root.sg.*.s2")))),
                 new CrossSeriesAggregationDescriptor(
                     TAggregationType.MAX_VALUE.name().toLowerCase(),
                     AggregationStep.FINAL,
@@ -609,7 +612,8 @@ public class QueryLogicalPlanUtil {
                         new 
TimeSeriesOperand(schemaMap.get("root.sg.d2.a.s2"))),
                     1,
                     Collections.emptyMap(),
-                    new TimeSeriesOperand(schemaMap.get("root.sg.*.*.s2"))),
+                    Collections.singletonList(
+                        new 
TimeSeriesOperand(schemaMap.get("root.sg.*.*.s2")))),
                 new CrossSeriesAggregationDescriptor(
                     TAggregationType.LAST_VALUE.name().toLowerCase(),
                     AggregationStep.FINAL,
@@ -618,7 +622,8 @@ public class QueryLogicalPlanUtil {
                         new TimeSeriesOperand(schemaMap.get("root.sg.d1.s1"))),
                     2,
                     Collections.emptyMap(),
-                    new TimeSeriesOperand(schemaMap.get("root.sg.*.s1"))),
+                    Collections.singletonList(
+                        new TimeSeriesOperand(schemaMap.get("root.sg.*.s1")))),
                 new CrossSeriesAggregationDescriptor(
                     TAggregationType.LAST_VALUE.name().toLowerCase(),
                     AggregationStep.FINAL,
@@ -626,7 +631,8 @@ public class QueryLogicalPlanUtil {
                         new 
TimeSeriesOperand(schemaMap.get("root.sg.d2.a.s1"))),
                     1,
                     Collections.emptyMap(),
-                    new TimeSeriesOperand(schemaMap.get("root.sg.*.*.s1")))),
+                    Collections.singletonList(
+                        new 
TimeSeriesOperand(schemaMap.get("root.sg.*.*.s1"))))),
             null,
             Ordering.DESC);
 
@@ -843,7 +849,8 @@ public class QueryLogicalPlanUtil {
                         new TimeSeriesOperand(schemaMap.get("root.sg.d1.s1"))),
                     2,
                     Collections.emptyMap(),
-                    new TimeSeriesOperand(schemaMap.get("root.sg.*.s1"))),
+                    Collections.singletonList(
+                        new TimeSeriesOperand(schemaMap.get("root.sg.*.s1")))),
                 new CrossSeriesAggregationDescriptor(
                     TAggregationType.MAX_VALUE.name().toLowerCase(),
                     AggregationStep.FINAL,
@@ -852,7 +859,8 @@ public class QueryLogicalPlanUtil {
                         new TimeSeriesOperand(schemaMap.get("root.sg.d2.s2"))),
                     2,
                     Collections.emptyMap(),
-                    new TimeSeriesOperand(schemaMap.get("root.sg.*.s2"))),
+                    Collections.singletonList(
+                        new TimeSeriesOperand(schemaMap.get("root.sg.*.s2")))),
                 new CrossSeriesAggregationDescriptor(
                     TAggregationType.LAST_VALUE.name().toLowerCase(),
                     AggregationStep.FINAL,
@@ -861,7 +869,8 @@ public class QueryLogicalPlanUtil {
                         new TimeSeriesOperand(schemaMap.get("root.sg.d1.s1"))),
                     2,
                     Collections.emptyMap(),
-                    new TimeSeriesOperand(schemaMap.get("root.sg.*.s1")))),
+                    Collections.singletonList(
+                        new 
TimeSeriesOperand(schemaMap.get("root.sg.*.s1"))))),
             null,
             Ordering.DESC);
 
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/AggregationDistributionTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/AggregationDistributionTest.java
index 673fb749960..788c70917b8 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/AggregationDistributionTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/AggregationDistributionTest.java
@@ -304,7 +304,8 @@ public class AggregationDistributionTest {
                         new TimeSeriesOperand(new PartialPath(d2s1Path))),
                     2,
                     Collections.emptyMap(),
-                    new TimeSeriesOperand(new PartialPath(groupedPath)))),
+                    Collections.singletonList(
+                        new TimeSeriesOperand(new PartialPath(groupedPath))))),
             null,
             Ordering.ASC);
     Analysis analysis = Util.constructAnalysis();
@@ -344,7 +345,8 @@ public class AggregationDistributionTest {
                         new TimeSeriesOperand(new PartialPath(d4s1Path))),
                     2,
                     Collections.emptyMap(),
-                    new TimeSeriesOperand(new PartialPath(groupedPath)))),
+                    Collections.singletonList(
+                        new TimeSeriesOperand(new PartialPath(groupedPath))))),
             null,
             Ordering.ASC);
     Analysis analysis = Util.constructAnalysis();
@@ -421,7 +423,8 @@ public class AggregationDistributionTest {
                         new TimeSeriesOperand(new PartialPath(d4s1Path))),
                     2,
                     Collections.emptyMap(),
-                    new TimeSeriesOperand(new PartialPath(groupedPath)))),
+                    Collections.singletonList(
+                        new TimeSeriesOperand(new PartialPath(groupedPath))))),
             null,
             Ordering.ASC);
 
@@ -503,14 +506,16 @@ public class AggregationDistributionTest {
                     Collections.singletonList(new TimeSeriesOperand(new 
PartialPath(d1s1Path))),
                     1,
                     Collections.emptyMap(),
-                    new TimeSeriesOperand(new PartialPath(groupedPathS1))),
+                    Collections.singletonList(
+                        new TimeSeriesOperand(new 
PartialPath(groupedPathS1)))),
                 new CrossSeriesAggregationDescriptor(
                     TAggregationType.COUNT.name().toLowerCase(),
                     AggregationStep.FINAL,
                     Collections.singletonList(new TimeSeriesOperand(new 
PartialPath(d1s2Path))),
                     1,
                     Collections.emptyMap(),
-                    new TimeSeriesOperand(new PartialPath(groupedPathS2)))),
+                    Collections.singletonList(
+                        new TimeSeriesOperand(new 
PartialPath(groupedPathS2))))),
             null,
             Ordering.ASC);
     Analysis analysis = Util.constructAnalysis();
@@ -569,14 +574,16 @@ public class AggregationDistributionTest {
                         new TimeSeriesOperand(new PartialPath(d2s1Path))),
                     2,
                     Collections.emptyMap(),
-                    new TimeSeriesOperand(new PartialPath(groupedPathS1))),
+                    Collections.singletonList(
+                        new TimeSeriesOperand(new 
PartialPath(groupedPathS1)))),
                 new CrossSeriesAggregationDescriptor(
                     TAggregationType.COUNT.name().toLowerCase(),
                     AggregationStep.FINAL,
                     Collections.singletonList(new TimeSeriesOperand(new 
PartialPath(d1s2Path))),
                     1,
                     Collections.emptyMap(),
-                    new TimeSeriesOperand(new PartialPath(groupedPathS2)))),
+                    Collections.singletonList(
+                        new TimeSeriesOperand(new 
PartialPath(groupedPathS2))))),
             null,
             Ordering.ASC);
     Analysis analysis = Util.constructAnalysis();
@@ -665,14 +672,16 @@ public class AggregationDistributionTest {
                         new TimeSeriesOperand(new PartialPath(d2s1Path))),
                     2,
                     Collections.emptyMap(),
-                    new TimeSeriesOperand(new PartialPath(groupedPathS1))),
+                    Collections.singletonList(
+                        new TimeSeriesOperand(new 
PartialPath(groupedPathS1)))),
                 new CrossSeriesAggregationDescriptor(
                     TAggregationType.COUNT.name().toLowerCase(),
                     AggregationStep.FINAL,
                     Collections.singletonList(new TimeSeriesOperand(new 
PartialPath(d1s2Path))),
                     2,
                     Collections.emptyMap(),
-                    new TimeSeriesOperand(new PartialPath(groupedPathS2)))),
+                    Collections.singletonList(
+                        new TimeSeriesOperand(new 
PartialPath(groupedPathS2))))),
             null,
             Ordering.ASC);
     Analysis analysis = Util.constructAnalysis();
@@ -840,7 +849,7 @@ public class AggregationDistributionTest {
     List<CrossSeriesAggregationDescriptor> descriptors = 
node.getGroupByLevelDescriptors();
     assertEquals(expected.size(), descriptors.size());
     for (CrossSeriesAggregationDescriptor descriptor : descriptors) {
-      String outputExpression = 
descriptor.getOutputExpression().getExpressionString();
+      String outputExpression = 
descriptor.getOutputExpressions().get(0).getExpressionString();
       assertEquals(expected.get(outputExpression).size(), 
descriptor.getInputExpressions().size());
       for (Expression inputExpression : descriptor.getInputExpressions()) {
         
assertTrue(expected.get(outputExpression).contains(inputExpression.getExpressionString()));
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/node/process/GroupByLevelNodeSerdeTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/node/process/GroupByLevelNodeSerdeTest.java
index 6d379d72f17..c7cff7ea412 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/node/process/GroupByLevelNodeSerdeTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/node/process/GroupByLevelNodeSerdeTest.java
@@ -97,7 +97,8 @@ public class GroupByLevelNodeSerdeTest {
                         new TimeSeriesOperand(new 
PartialPath("root.sg.d2.s1"))),
                     2,
                     Collections.emptyMap(),
-                    new TimeSeriesOperand(new PartialPath("root.sg.*.s1")))),
+                    Collections.singletonList(
+                        new TimeSeriesOperand(new 
PartialPath("root.sg.*.s1"))))),
             groupByTimeParameter,
             Ordering.ASC);
 
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/node/process/GroupByTagNodeSerdeTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/node/process/GroupByTagNodeSerdeTest.java
index b7e879681b2..3d307a51306 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/node/process/GroupByTagNodeSerdeTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/node/process/GroupByTagNodeSerdeTest.java
@@ -67,11 +67,12 @@ public class GroupByTagNodeSerdeTest {
             Collections.singletonList(new TimeSeriesOperand(new 
PartialPath("root.sg.d1.s1"))),
             1,
             Collections.emptyMap(),
-            new FunctionExpression(
-                "max_time",
-                new LinkedHashMap<>(),
-                Collections.singletonList(
-                    new TimeSeriesOperand(new PartialPath("root.sg.d1.s1")))));
+            Collections.singletonList(
+                new FunctionExpression(
+                    "max_time",
+                    new LinkedHashMap<>(),
+                    Collections.singletonList(
+                        new TimeSeriesOperand(new 
PartialPath("root.sg.d1.s1"))))));
     CrossSeriesAggregationDescriptor s1Avg =
         new CrossSeriesAggregationDescriptor(
             TAggregationType.AVG.name().toLowerCase(),
@@ -79,11 +80,12 @@ public class GroupByTagNodeSerdeTest {
             Collections.singletonList(new TimeSeriesOperand(new 
PartialPath("root.sg.d1.s1"))),
             1,
             Collections.emptyMap(),
-            new FunctionExpression(
-                "avg",
-                new LinkedHashMap<>(),
-                Collections.singletonList(
-                    new TimeSeriesOperand(new PartialPath("root.sg.d1.s1")))));
+            Collections.singletonList(
+                new FunctionExpression(
+                    "avg",
+                    new LinkedHashMap<>(),
+                    Collections.singletonList(
+                        new TimeSeriesOperand(new 
PartialPath("root.sg.d1.s1"))))));
     AggregationDescriptor s1MaxTimePartial =
         new AggregationDescriptor(
             TAggregationType.MAX_TIME.name().toLowerCase(),

Reply via email to