This is an automated email from the ASF dual-hosted git repository. lancelly pushed a commit to branch max_by in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit a0031a8eb83f9d080b5cfce2a57076aa40ac673c Author: lancelly <[email protected]> AuthorDate: Sat Jan 27 19:21:34 2024 +0800 Support GroupByLevel and add related ITs --- .../it/aggregation/IoTDBAggregationByLevelIT.java | 37 ++++---- .../db/it/aggregation/IoTDBAggregationIT.java | 5 +- .../maxby/IoTDBMaxByAlignedSeriesIT.java | 8 ++ .../db/it/aggregation/maxby/IoTDBMaxByIT.java | 102 +++++++++++++++++---- .../execution/aggregation/AccumulatorFactory.java | 2 +- .../execution/aggregation/MaxByAccumulator.java | 59 +++++++----- .../SlidingWindowAggregatorFactory.java | 12 ++- .../ReplaceRawPathWithGroupedPathVisitor.java | 7 +- .../cartesian/BindSchemaForPredicateVisitor.java | 6 +- .../plan/planner/LogicalPlanBuilder.java | 41 ++++++--- .../plan/planner/OperatorTreeGenerator.java | 10 +- .../plan/planner/distribution/SourceRewriter.java | 17 ++-- .../CrossSeriesAggregationDescriptor.java | 67 ++++++++++---- .../plan/analyze/AggregationDescriptorTest.java | 8 +- .../plan/planner/LogicalPlannerTest.java | 2 +- .../plan/planner/QueryLogicalPlanUtil.java | 27 ++++-- .../distribution/AggregationDistributionTest.java | 29 ++++-- .../node/process/GroupByLevelNodeSerdeTest.java | 3 +- .../node/process/GroupByTagNodeSerdeTest.java | 22 +++-- 19 files changed, 314 insertions(+), 150 deletions(-) diff --git a/integration-test/src/test/java/org/apache/iotdb/db/it/aggregation/IoTDBAggregationByLevelIT.java b/integration-test/src/test/java/org/apache/iotdb/db/it/aggregation/IoTDBAggregationByLevelIT.java index 4207e21c78f..e61e81b9ccb 100644 --- a/integration-test/src/test/java/org/apache/iotdb/db/it/aggregation/IoTDBAggregationByLevelIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/db/it/aggregation/IoTDBAggregationByLevelIT.java @@ -283,37 +283,34 @@ public class IoTDBAggregationByLevelIT { } } - try (ResultSet resultSet = - statement.executeQuery( - "select max_by(temperature) from root.sg1.* GROUP BY level=0")) { + statement.executeQuery("select max_by(temperature) from root.sg1.* GROUP BY level=0")) { while (resultSet.next()) { String ans = - resultSet.getString(lastValue("root.*.d1.temperature")) - + "," - + resultSet.getString(lastValue("root.*.d2.temperature")) - + "," - + resultSet.getString(maxValue("root.*.d1.temperature")) - + "," - + resultSet.getString(maxValue("root.*.d2.temperature")); + resultSet.getString(lastValue("root.*.d1.temperature")) + + "," + + resultSet.getString(lastValue("root.*.d2.temperature")) + + "," + + resultSet.getString(maxValue("root.*.d1.temperature")) + + "," + + resultSet.getString(maxValue("root.*.d2.temperature")); Assert.assertEquals(retArray[cnt], ans); cnt++; } } - try (ResultSet resultSet = - statement.executeQuery( - "select last_value(temperature), max_value(temperature) from root.sg1.* GROUP BY level=2")) { + statement.executeQuery( + "select last_value(temperature), max_value(temperature) from root.sg1.* GROUP BY level=2")) { while (resultSet.next()) { String ans = - resultSet.getString(lastValue("root.*.d1.temperature")) - + "," - + resultSet.getString(lastValue("root.*.d2.temperature")) - + "," - + resultSet.getString(maxValue("root.*.d1.temperature")) - + "," - + resultSet.getString(maxValue("root.*.d2.temperature")); + resultSet.getString(lastValue("root.*.d1.temperature")) + + "," + + resultSet.getString(lastValue("root.*.d2.temperature")) + + "," + + resultSet.getString(maxValue("root.*.d1.temperature")) + + "," + + resultSet.getString(maxValue("root.*.d2.temperature")); Assert.assertEquals(retArray[cnt], ans); cnt++; } diff --git a/integration-test/src/test/java/org/apache/iotdb/db/it/aggregation/IoTDBAggregationIT.java b/integration-test/src/test/java/org/apache/iotdb/db/it/aggregation/IoTDBAggregationIT.java index 1dd8fca19cd..3bf0ef7fc84 100644 --- a/integration-test/src/test/java/org/apache/iotdb/db/it/aggregation/IoTDBAggregationIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/db/it/aggregation/IoTDBAggregationIT.java @@ -992,7 +992,7 @@ public class IoTDBAggregationIT { int cnt; try (ResultSet resultSet = statement.executeQuery( - "SELECT max_value(time, s0) " + "SELECT max_by(time, s0) " + "FROM root.vehicle.d0 WHERE time >= 100 AND time < 9000")) { cnt = 0; while (resultSet.next()) { @@ -1005,8 +1005,7 @@ public class IoTDBAggregationIT { } try (ResultSet resultSet = - statement.executeQuery( - "SELECT max_value(time,s0) FROM root.vehicle.d0 WHERE time < 2500")) { + statement.executeQuery("SELECT max_by(time,s0) FROM root.vehicle.d0 WHERE time < 2500")) { while (resultSet.next()) { String ans = resultSet.getString(TIMESTAMP_STR) + "," + resultSet.getString(maxBy("time", d0s0)); diff --git a/integration-test/src/test/java/org/apache/iotdb/db/it/aggregation/maxby/IoTDBMaxByAlignedSeriesIT.java b/integration-test/src/test/java/org/apache/iotdb/db/it/aggregation/maxby/IoTDBMaxByAlignedSeriesIT.java index 648b9e7b388..05af62d8552 100644 --- a/integration-test/src/test/java/org/apache/iotdb/db/it/aggregation/maxby/IoTDBMaxByAlignedSeriesIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/db/it/aggregation/maxby/IoTDBMaxByAlignedSeriesIT.java @@ -40,6 +40,10 @@ public class IoTDBMaxByAlignedSeriesIT extends IoTDBMaxByIT { "INSERT INTO root.db.d1(timestamp,y1,y2,y3,y4,y5,y6) values(4, 4, 4, 4, 4, false, \"4\")", "INSERT INTO root.db.d1(timestamp,x1,x2,x3,x4,x5,x6) values(8, 3, 3, 3, 3, false, \"3\")", "INSERT INTO root.db.d1(timestamp,y1,y2,y3,y4,y5,y6) values(8, 8, 8, 8, 8, false, \"4\")", + "INSERT INTO root.db.d1(timestamp,x1,x2,x3,x4,x5,x6) values(12, 3, 3, 3, 3, false, \"3\")", + "INSERT INTO root.db.d1(timestamp,y1,y2,y3,y4,y5,y6) values(12, 9, 9, 9, 9, false, \"4\")", + "INSERT INTO root.db.d1(timestamp,x1,x2,x3,x4,x5,x6) values(13, 4, 4, 4, 4, false, \"4\")", + "INSERT INTO root.db.d1(timestamp,y1,y2,y3,y4,y5,y6) values(13, 9, 9, 9, 9, false, \"4\")", "flush", // For Align By Device "CREATE ALIGNED TIMESERIES root.db.d2(x1 INT32, x2 INT64, x3 FLOAT, x4 DOUBLE, x5 BOOLEAN, x6 TEXT)", @@ -52,6 +56,10 @@ public class IoTDBMaxByAlignedSeriesIT extends IoTDBMaxByIT { "INSERT INTO root.db.d2(timestamp,y1,y2,y3,y4,y5,y6) values(4, 4, 4, 4, 4, false, \"4\")", "INSERT INTO root.db.d2(timestamp,x1,x2,x3,x4,x5,x6) values(8, 3, 3, 3, 3, false, \"3\")", "INSERT INTO root.db.d2(timestamp,y1,y2,y3,y4,y5,y6) values(8, 8, 8, 8, 8, false, \"4\")", + "INSERT INTO root.db.d2(timestamp,x1,x2,x3,x4,x5,x6) values(12, 3, 3, 3, 3, false, \"3\")", + "INSERT INTO root.db.d2(timestamp,y1,y2,y3,y4,y5,y6) values(12, 8, 8, 8, 8, false, \"4\")", + "INSERT INTO root.db.d2(timestamp,x1,x2,x3,x4,x5,x6) values(13, 4, 4, 4, 4, false, \"4\")", + "INSERT INTO root.db.d2(timestamp,y1,y2,y3,y4,y5,y6) values(13, 8, 8, 8, 8, false, \"4\")", }; @BeforeClass diff --git a/integration-test/src/test/java/org/apache/iotdb/db/it/aggregation/maxby/IoTDBMaxByIT.java b/integration-test/src/test/java/org/apache/iotdb/db/it/aggregation/maxby/IoTDBMaxByIT.java index 29314a95f53..cfad1e26211 100644 --- a/integration-test/src/test/java/org/apache/iotdb/db/it/aggregation/maxby/IoTDBMaxByIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/db/it/aggregation/maxby/IoTDBMaxByIT.java @@ -73,6 +73,10 @@ public class IoTDBMaxByIT { "INSERT INTO root.db.d1(timestamp,y1,y2,y3,y4,y5,y6) values(4, 4, 4, 4, 4, false, \"4\")", "INSERT INTO root.db.d1(timestamp,x1,x2,x3,x4,x5,x6) values(8, 3, 3, 3, 3, false, \"3\")", "INSERT INTO root.db.d1(timestamp,y1,y2,y3,y4,y5,y6) values(8, 8, 8, 8, 8, false, \"4\")", + "INSERT INTO root.db.d1(timestamp,x1,x2,x3,x4,x5,x6) values(12, 3, 3, 3, 3, false, \"3\")", + "INSERT INTO root.db.d1(timestamp,y1,y2,y3,y4,y5,y6) values(12, 9, 9, 9, 9, false, \"4\")", + "INSERT INTO root.db.d1(timestamp,x1,x2,x3,x4,x5,x6) values(13, 4, 4, 4, 4, false, \"4\")", + "INSERT INTO root.db.d1(timestamp,y1,y2,y3,y4,y5,y6) values(13, 9, 9, 9, 9, false, \"4\")", "flush", // For Align By Device @@ -96,6 +100,10 @@ public class IoTDBMaxByIT { "INSERT INTO root.db.d2(timestamp,y1,y2,y3,y4,y5,y6) values(2, 2, 2, 2, 2, true, \"4\")", "INSERT INTO root.db.d2(timestamp,y1,y2,y3,y4,y5,y6) values(3, 3, 3, 3, 3, false, \"3\")", "INSERT INTO root.db.d2(timestamp,y1,y2,y3,y4,y5,y6) values(4, 1, 1, 1, 1, false, \"1\")", + "INSERT INTO root.db.d2(timestamp,x1,x2,x3,x4,x5,x6) values(12, 3, 3, 3, 3, false, \"3\")", + "INSERT INTO root.db.d2(timestamp,y1,y2,y3,y4,y5,y6) values(12, 9, 9, 9, 9, false, \"1\")", + "INSERT INTO root.db.d2(timestamp,x1,x2,x3,x4,x5,x6) values(13, 4, 4, 4, 4, false, \"4\")", + "INSERT INTO root.db.d2(timestamp,y1,y2,y3,y4,y5,y6) values(13, 9, 9, 9, 9, false, \"1\")", "flush" }; @@ -321,7 +329,7 @@ public class IoTDBMaxByIT { } @Test - public void testMaxByWithGroupBy() { + public void testMaxByWithGroupByTime() { try (Connection connection = EnvFactory.getEnv().getConnection(); Statement statement = connection.createStatement()) { String[] expectedHeader = @@ -334,17 +342,33 @@ public class IoTDBMaxByIT { "max_by(root.db.d1.x5, root.db.d1.y2)", "max_by(root.db.d1.x6, root.db.d1.y2)", }; - String[] retArray = + String y = "y2"; + // order by time ASC + String[] retArray1 = new String[] { "0,3,3,3.0,3.0,false,3,", "4,null,null,null,null,null,null,", "8,3,3,3.0,3.0,false,3," }; - String y = "y2"; resultSetEqualTest( String.format( - "select max_by(x1,%s),max_by(x2,%s),max_by(x3,%s),max_by(x4,%s),max_by(x5,%s),max_by(x6,%s) from root.db.d1 group by ([0,9),4ms)", + "select max_by(x1,%s),max_by(x2,%s),max_by(x3,%s),max_by(x4,%s),max_by(x5,%s),max_by(x6,%s) from root.db.d1 where time <= 10 group by ([0,9),4ms) ", y, y, y, y, y, y), expectedHeader, - retArray); + retArray1); + + // order by time DESC + String[] retArray2 = + new String[] { + "12,3,3,3.0,3.0,false,3,", + "8,3,3,3.0,3.0,false,3,", + "4,null,null,null,null,null,null,", + "0,3,3,3.0,3.0,false,3," + }; + resultSetEqualTest( + String.format( + "select max_by(x1,%s),max_by(x2,%s),max_by(x3,%s),max_by(x4,%s),max_by(x5,%s),max_by(x6,%s) from root.db.d1 group by ([0,13),4ms) order by time desc", + y, y, y, y, y, y), + expectedHeader, + retArray2); } catch (Exception e) { e.printStackTrace(); fail(e.getMessage()); @@ -377,7 +401,7 @@ public class IoTDBMaxByIT { }; resultSetEqualTest( String.format( - "select max_by(x1,%s),max_by(x2,%s),max_by(x3,%s),max_by(x4,%s),max_by(x5,%s),max_by(x6,%s) from root.db.d1 group by ([0,9),4ms,2ms)", + "select max_by(x1,%s),max_by(x2,%s),max_by(x3,%s),max_by(x4,%s),max_by(x5,%s),max_by(x6,%s) from root.db.d1 where time <= 10 group by ([0,9),4ms,2ms) ", y, y, y, y, y, y), expectedHeader, retArray); @@ -388,16 +412,62 @@ public class IoTDBMaxByIT { } } - // test max_by different types of x - // test max_by time - // test max_by with expression - // test max_by align by device - // test max_by group by time - // test max_by sliding window - // test max_by aligned series - // test max_by multi data region - // test max_by group by level - // test max_by having + @Test + public void testMaxByWithHaving() { + try (Connection connection = EnvFactory.getEnv().getConnection(); + Statement statement = connection.createStatement()) { + String[] expectedHeader = + new String[] { + TIMESTAMP_STR, + "max_by(root.db.d1.x1, root.db.d1.y2)", + "max_by(root.db.d1.x2, root.db.d1.y2)", + "max_by(root.db.d1.x3, root.db.d1.y2)", + "max_by(root.db.d1.x4, root.db.d1.y2)", + "max_by(root.db.d1.x5, root.db.d1.y2)", + "max_by(root.db.d1.x6, root.db.d1.y2)", + }; + String[] retArray = new String[] {"8,3,3,3.0,3.0,false,3,"}; + String y = "y2"; + resultSetEqualTest( + String.format( + "select max_by(x1,%s),max_by(x2,%s),max_by(x3,%s),max_by(x4,%s),max_by(x5,%s),max_by(x6,%s) from root.db.d1 group by ([0,9),4ms) having max_by(time, %s) > 4", + y, y, y, y, y, y, y), + expectedHeader, + retArray); + } catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } + + @Test + public void testMaxByWithGroupByLevel() { + try (Connection connection = EnvFactory.getEnv().getConnection(); + Statement statement = connection.createStatement()) { + String[] retArray = new String[] {"3,3,3.0,3.0,false,3,"}; + String[] yArray = new String[] {"y1", "y2", "y3", "y4"}; + for (String y : yArray) { + String[] expectedHeader = + new String[] { + String.format("max_by(root.*.*.x1, root.*.*.%s)", y), + String.format("max_by(root.*.*.x2, root.*.*.%s)", y), + String.format("max_by(root.*.*.x3, root.*.*.%s)", y), + String.format("max_by(root.*.*.x4, root.*.*.%s)", y), + String.format("max_by(root.*.*.x5, root.*.*.%s)", y), + String.format("max_by(root.*.*.x6, root.*.*.%s)", y), + }; + resultSetEqualTest( + String.format( + "select max_by(x1,%s),max_by(x2,%s),max_by(x3,%s),max_by(x4,%s),max_by(x5,%s),max_by(x6,%s) from root.db.** group by level = 0", + y, y, y, y, y, y), + expectedHeader, + retArray); + } + } catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } /** @return yInput -> expectedHeader */ private Map<String, String[]> generateExpectedHeadersForMaxByTest( diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/AccumulatorFactory.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/AccumulatorFactory.java index 9c0b7ce7fcb..68f233ad8d4 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/AccumulatorFactory.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/AccumulatorFactory.java @@ -133,7 +133,7 @@ public class AccumulatorFactory { } } - private static boolean isMultiInputAggregation(TAggregationType aggregationType) { + public static boolean isMultiInputAggregation(TAggregationType aggregationType) { switch (aggregationType) { case MAX_BY: return true; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/MaxByAccumulator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/MaxByAccumulator.java index 922acc45aa3..7f101e982d4 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/MaxByAccumulator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/MaxByAccumulator.java @@ -52,6 +52,8 @@ public class MaxByAccumulator implements Accumulator { private boolean initResult; + private long yTimeStamp = Long.MAX_VALUE; + private static final String UNSUPPORTED_TYPE_MESSAGE = "Unsupported data type in MaxBy: %s"; public MaxByAccumulator(TSDataType xDataType, TSDataType yDataType) { @@ -138,6 +140,7 @@ public class MaxByAccumulator implements Accumulator { xNull = true; this.xResult.reset(); this.yMaxValue.reset(); + yTimeStamp = Long.MAX_VALUE; } @Override @@ -161,14 +164,17 @@ public class MaxByAccumulator implements Accumulator { continue; } if (!column[2].isNull(i)) { - updateIntResult(column[2].getInt(i), column[1], i); + updateIntResult(column[0].getLong(i), column[2].getInt(i), column[1], i); } } } - private void updateIntResult(int yMaxVal, Column xColumn, int xIndex) { - if (!initResult || yMaxVal > yMaxValue.getInt()) { + private void updateIntResult(long time, int yMaxVal, Column xColumn, int xIndex) { + if (!initResult + || yMaxVal > yMaxValue.getInt() + || (yMaxVal == yMaxValue.getInt() && time < yTimeStamp)) { initResult = true; + yTimeStamp = time; yMaxValue.setInt(yMaxVal); updateX(xColumn, xIndex); } @@ -180,14 +186,17 @@ public class MaxByAccumulator implements Accumulator { continue; } if (!column[2].isNull(i)) { - updateLongResult(column[2].getLong(i), column[1], i); + updateLongResult(column[0].getLong(i), column[2].getLong(i), column[1], i); } } } - private void updateLongResult(long yMaxVal, Column xColumn, int xIndex) { - if (!initResult || yMaxVal > yMaxValue.getLong()) { + private void updateLongResult(long time, long yMaxVal, Column xColumn, int xIndex) { + if (!initResult + || yMaxVal > yMaxValue.getLong() + || (yMaxVal == yMaxValue.getLong() && time < yTimeStamp)) { initResult = true; + yTimeStamp = time; yMaxValue.setLong(yMaxVal); updateX(xColumn, xIndex); } @@ -199,14 +208,17 @@ public class MaxByAccumulator implements Accumulator { continue; } if (!column[2].isNull(i)) { - updateFloatResult(column[2].getFloat(i), column[1], i); + updateFloatResult(column[0].getLong(i), column[2].getFloat(i), column[1], i); } } } - private void updateFloatResult(float yMaxVal, Column xColumn, int xIndex) { - if (!initResult || yMaxVal > yMaxValue.getFloat()) { + private void updateFloatResult(long time, float yMaxVal, Column xColumn, int xIndex) { + if (!initResult + || yMaxVal > yMaxValue.getFloat() + || (yMaxVal == yMaxValue.getFloat() && time < yTimeStamp)) { initResult = true; + yTimeStamp = time; yMaxValue.setFloat(yMaxVal); updateX(xColumn, xIndex); } @@ -218,14 +230,17 @@ public class MaxByAccumulator implements Accumulator { continue; } if (!column[2].isNull(i)) { - updateDoubleResult(column[2].getDouble(i), column[1], i); + updateDoubleResult(column[0].getLong(i), column[2].getDouble(i), column[1], i); } } } - private void updateDoubleResult(double yMaxVal, Column xColumn, int xIndex) { - if (!initResult || yMaxVal > yMaxValue.getDouble()) { + private void updateDoubleResult(long time, double yMaxVal, Column xColumn, int xIndex) { + if (!initResult + || yMaxVal > yMaxValue.getDouble() + || (yMaxVal == yMaxValue.getDouble() && time < yTimeStamp)) { initResult = true; + yTimeStamp = time; yMaxValue.setDouble(yMaxVal); updateX(xColumn, xIndex); } @@ -295,6 +310,7 @@ public class MaxByAccumulator implements Accumulator { ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(); DataOutputStream dataOutputStream = new DataOutputStream(byteArrayOutputStream); try { + dataOutputStream.writeLong(yTimeStamp); writeIntermediateToStream(yDataType, yMaxValue, dataOutputStream); dataOutputStream.writeBoolean(xNull); if (!xNull) { @@ -335,34 +351,35 @@ public class MaxByAccumulator implements Accumulator { } private void updateFromBytesIntermediateInput(byte[] bytes) { - int offset = 0; + long time = BytesUtils.bytesToLongFromOffset(bytes, Long.BYTES, 0); + int offset = Long.BYTES; // Use Column to store x value TsBlockBuilder builder = new TsBlockBuilder(Collections.singletonList(xDataType)); ColumnBuilder columnBuilder = builder.getValueColumnBuilders()[0]; switch (yDataType) { case INT32: - int intMaxVal = BytesUtils.bytesToInt(bytes, 0); + int intMaxVal = BytesUtils.bytesToInt(bytes, offset); offset += Integer.BYTES; readXFromBytesIntermediateInput(bytes, offset, columnBuilder); - updateIntResult(intMaxVal, columnBuilder.build(), 0); + updateIntResult(time, intMaxVal, columnBuilder.build(), 0); break; case INT64: - long longMaxVal = BytesUtils.bytesToLongFromOffset(bytes, Long.BYTES, 0); + long longMaxVal = BytesUtils.bytesToLongFromOffset(bytes, Long.BYTES, offset); offset += Long.BYTES; readXFromBytesIntermediateInput(bytes, offset, columnBuilder); - updateLongResult(longMaxVal, columnBuilder.build(), 0); + updateLongResult(time, longMaxVal, columnBuilder.build(), 0); break; case FLOAT: - float floatMaxVal = BytesUtils.bytesToFloat(bytes, 0); + float floatMaxVal = BytesUtils.bytesToFloat(bytes, offset); offset += Float.BYTES; readXFromBytesIntermediateInput(bytes, offset, columnBuilder); - updateFloatResult(floatMaxVal, columnBuilder.build(), 0); + updateFloatResult(time, floatMaxVal, columnBuilder.build(), 0); break; case DOUBLE: - double doubleMaxVal = BytesUtils.bytesToDouble(bytes, 0); + double doubleMaxVal = BytesUtils.bytesToDouble(bytes, offset); offset += Long.BYTES; readXFromBytesIntermediateInput(bytes, offset, columnBuilder); - updateDoubleResult(doubleMaxVal, columnBuilder.build(), 0); + updateDoubleResult(time, doubleMaxVal, columnBuilder.build(), 0); break; case TEXT: case BOOLEAN: diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/slidingwindow/SlidingWindowAggregatorFactory.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/slidingwindow/SlidingWindowAggregatorFactory.java index cfc6461d1bc..ce58e387cc9 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/slidingwindow/SlidingWindowAggregatorFactory.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/slidingwindow/SlidingWindowAggregatorFactory.java @@ -122,17 +122,21 @@ public class SlidingWindowAggregatorFactory { // Intermediate Value of maxBy is a byte array: | y | xNull | x | maxByComparators.put( TSDataType.INT32, - Comparator.comparingInt(o -> BytesUtils.bytesToInt(o.getBinary(0).getValues(), 0))); + Comparator.comparingInt( + o -> BytesUtils.bytesToInt(o.getBinary(0).getValues(), Long.BYTES))); maxByComparators.put( TSDataType.INT64, Comparator.comparingLong( - o -> BytesUtils.bytesToLongFromOffset(o.getBinary(0).getValues(), Long.BYTES, 0))); + o -> + BytesUtils.bytesToLongFromOffset( + o.getBinary(0).getValues(), Long.BYTES, Long.BYTES))); maxByComparators.put( TSDataType.FLOAT, - Comparator.comparing(o -> BytesUtils.bytesToFloat(o.getBinary(0).getValues(), 0))); + Comparator.comparing(o -> BytesUtils.bytesToFloat(o.getBinary(0).getValues(), Long.BYTES))); maxByComparators.put( TSDataType.DOUBLE, - Comparator.comparingDouble(o -> BytesUtils.bytesToDouble(o.getBinary(0).getValues(), 0))); + Comparator.comparingDouble( + o -> BytesUtils.bytesToDouble(o.getBinary(0).getValues(), Long.BYTES))); } public static SlidingWindowAggregator createSlidingWindowAggregator( diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/expression/visitor/ReplaceRawPathWithGroupedPathVisitor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/expression/visitor/ReplaceRawPathWithGroupedPathVisitor.java index 1ec879a7f81..424e7287741 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/expression/visitor/ReplaceRawPathWithGroupedPathVisitor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/expression/visitor/ReplaceRawPathWithGroupedPathVisitor.java @@ -26,6 +26,7 @@ import org.apache.iotdb.db.queryengine.plan.expression.leaf.ConstantOperand; import org.apache.iotdb.db.queryengine.plan.expression.leaf.TimeSeriesOperand; import org.apache.iotdb.db.queryengine.plan.expression.leaf.TimestampOperand; import org.apache.iotdb.db.queryengine.plan.expression.multi.FunctionExpression; +import org.apache.iotdb.db.utils.constant.SqlConstant; import java.util.ArrayList; import java.util.List; @@ -43,10 +44,8 @@ public class ReplaceRawPathWithGroupedPathVisitor for (Expression childExpression : functionExpression.getExpressions()) { childrenExpressions.add(process(childExpression, context)); - // We just process first input Expression of AggregationFunction. - // If AggregationFunction need more than one input series, - // we need to reconsider the process of it - if (functionExpression.isBuiltInAggregationFunctionExpression()) { + // We just process first input Expression of Count_IF + if (SqlConstant.COUNT_IF.equalsIgnoreCase(functionExpression.getFunctionName())) { List<Expression> children = functionExpression.getExpressions(); for (int i = 1; i < children.size(); i++) { childrenExpressions.add(children.get(i)); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/expression/visitor/cartesian/BindSchemaForPredicateVisitor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/expression/visitor/cartesian/BindSchemaForPredicateVisitor.java index bca177d4505..968d6b51fd3 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/expression/visitor/cartesian/BindSchemaForPredicateVisitor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/expression/visitor/cartesian/BindSchemaForPredicateVisitor.java @@ -88,11 +88,9 @@ public class BindSchemaForPredicateVisitor suffixExpression, new Context(context.getPrefixPaths(), context.getSchemaTree(), false))); - // We just process first input Expression of AggregationFunction, + // We just process first input Expression of Count_IF, // keep other input Expressions as origin and bind Type - // If AggregationFunction need more than one input series, - // we need to reconsider the process of it - if (predicate.isBuiltInAggregationFunctionExpression()) { + if (SqlConstant.COUNT_IF.equalsIgnoreCase(predicate.getFunctionName())) { List<Expression> children = predicate.getExpressions(); bindTypeForAggregationNonSeriesInputExpressions( predicate.getFunctionName(), children, extendedExpressions); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanBuilder.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanBuilder.java index 68dff31c74f..f6000913991 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanBuilder.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanBuilder.java @@ -31,6 +31,7 @@ import org.apache.iotdb.commons.path.PathPatternTree; import org.apache.iotdb.commons.schema.filter.SchemaFilter; import org.apache.iotdb.db.queryengine.common.MPPQueryContext; import org.apache.iotdb.db.queryengine.common.header.ColumnHeaderConstant; +import org.apache.iotdb.db.queryengine.execution.aggregation.AccumulatorFactory; import org.apache.iotdb.db.queryengine.plan.analyze.Analysis; import org.apache.iotdb.db.queryengine.plan.analyze.ExpressionAnalyzer; import org.apache.iotdb.db.queryengine.plan.analyze.TypeProvider; @@ -701,12 +702,8 @@ public class LogicalPlanBuilder { List<String> partialAggregationsNames = SchemaUtils.splitPartialAggregation(aggregationDescriptor.getAggregationType()); String inputExpressionStr = getInputExpressionString(aggregationDescriptor); - for (String partialAggregationName : partialAggregationsNames) { - TSDataType aggregationType = SchemaUtils.getAggregationType(partialAggregationName); - typeProvider.setType( - String.format("%s(%s)", partialAggregationName, inputExpressionStr), - aggregationType == null ? typeProvider.getType(inputExpressionStr) : aggregationType); - } + partialAggregationsNames.forEach( + x -> setTypeForPartialAggregation(typeProvider, x, inputExpressionStr)); } private static String getInputExpressionString(AggregationDescriptor aggregationDescriptor) { @@ -718,15 +715,30 @@ public class LogicalPlanBuilder { } } + private static void setTypeForPartialAggregation( + TypeProvider typeProvider, String partialAggregationName, String inputExpressionStr) { + TSDataType aggregationType = SchemaUtils.getAggregationType(partialAggregationName); + typeProvider.setType( + String.format("%s(%s)", partialAggregationName, inputExpressionStr), + aggregationType == null ? typeProvider.getType(inputExpressionStr) : aggregationType); + } + public static void updateTypeProviderByPartialAggregation( CrossSeriesAggregationDescriptor aggregationDescriptor, TypeProvider typeProvider) { List<String> partialAggregationsNames = SchemaUtils.splitPartialAggregation(aggregationDescriptor.getAggregationType()); - PartialPath path = ((TimeSeriesOperand) aggregationDescriptor.getOutputExpression()).getPath(); - for (String partialAggregationName : partialAggregationsNames) { - typeProvider.setType( - String.format("%s(%s)", partialAggregationName, path.getFullPath()), - SchemaUtils.getSeriesTypeByPath(path, partialAggregationName)); + if (!AccumulatorFactory.isMultiInputAggregation(aggregationDescriptor.getAggregationType())) { + PartialPath path = + ((TimeSeriesOperand) aggregationDescriptor.getOutputExpressions().get(0)).getPath(); + for (String partialAggregationName : partialAggregationsNames) { + typeProvider.setType( + String.format("%s(%s)", partialAggregationName, path.getFullPath()), + SchemaUtils.getSeriesTypeByPath(path, partialAggregationName)); + } + } else { + String inputExpressionStr = aggregationDescriptor.getOutputExpressionsAsBuilder().toString(); + partialAggregationsNames.forEach( + x -> setTypeForPartialAggregation(typeProvider, x, inputExpressionStr)); } } @@ -1027,12 +1039,13 @@ public class LogicalPlanBuilder { .collect(Collectors.toList()), entry.getValue().size(), ((FunctionExpression) entry.getKey()).getFunctionAttributes(), - entry.getKey().getExpressions().get(0))); + entry.getKey().getExpressions())); } updateTypeProvider(groupByLevelExpressions.keySet()); updateTypeProvider( groupByLevelDescriptors.stream() - .map(CrossSeriesAggregationDescriptor::getOutputExpression) + .map(CrossSeriesAggregationDescriptor::getOutputExpressions) + .flatMap(Collection::stream) .collect(Collectors.toList())); return new GroupByLevelNode( context.getQueryId().genPlanNodeId(), @@ -1070,7 +1083,7 @@ public class LogicalPlanBuilder { curStep, groupedTimeseriesOperands.get(expression), ((FunctionExpression) expression).getFunctionAttributes(), - expression.getExpressions().get(0)); + expression.getExpressions()); aggregationDescriptors.add(aggregationDescriptor); added = true; break; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/OperatorTreeGenerator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/OperatorTreeGenerator.java index 7cee314068f..081edad29fa 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/OperatorTreeGenerator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/OperatorTreeGenerator.java @@ -265,6 +265,7 @@ import java.util.Map; import java.util.Objects; import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; +import java.util.stream.IntStream; import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkState; @@ -1477,9 +1478,14 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP node.getGroupByLevelDescriptors(); for (CrossSeriesAggregationDescriptor descriptor : aggregationDescriptors) { List<InputLocation[]> inputLocationList = calcInputLocationList(descriptor, layout); + // Use the first set of InputExpression List<TSDataType> inputDataTypes = - descriptor.getInputExpressions().stream() - .map(x -> context.getTypeProvider().getType(x.getExpressionString())) + IntStream.range(0, descriptor.getExpressionNumOfOneInput()) + .mapToObj( + x -> + context + .getTypeProvider() + .getType(descriptor.getInputExpressions().get(x).getExpressionString())) .collect(Collectors.toList()); aggregators.add( new Aggregator( diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/SourceRewriter.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/SourceRewriter.java index 4cf27f21fb9..f1268ec2db8 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/SourceRewriter.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/SourceRewriter.java @@ -1117,9 +1117,9 @@ public class SourceRewriter extends SimplePlanNodeRewriter<DistributionPlanConte for (Expression exp : originalDescriptor.getInputExpressions()) { columnNameToExpression.put(exp.getExpressionString(), exp); } - columnNameToExpression.put( - originalDescriptor.getOutputExpression().getExpressionString(), - originalDescriptor.getOutputExpression()); + originalDescriptor + .getOutputExpressions() + .forEach(x -> columnNameToExpression.put(x.getExpressionString(), x)); } context.setColumnNameToExpression(columnNameToExpression); @@ -1285,9 +1285,14 @@ public class SourceRewriter extends SimplePlanNodeRewriter<DistributionPlanConte handle.getGroupByLevelDescriptors()) { Set<Expression> descriptorExpressions = new HashSet<>(); - if (childrenExpressionSet.contains(originalDescriptor.getOutputExpression())) { - descriptorExpressions.add(originalDescriptor.getOutputExpression()); - } + originalDescriptor + .getOutputExpressions() + .forEach( + x -> { + if (childrenExpressionSet.contains(x)) { + descriptorExpressions.add(x); + } + }); for (Expression exp : originalDescriptor.getInputExpressions()) { if (childrenExpressionSet.contains(exp)) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/parameter/CrossSeriesAggregationDescriptor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/parameter/CrossSeriesAggregationDescriptor.java index f03285e599a..ac33b3ee2ad 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/parameter/CrossSeriesAggregationDescriptor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/parameter/CrossSeriesAggregationDescriptor.java @@ -19,6 +19,7 @@ package org.apache.iotdb.db.queryengine.plan.planner.plan.parameter; +import org.apache.iotdb.common.rpc.thrift.TAggregationType; import org.apache.iotdb.db.queryengine.plan.expression.Expression; import org.apache.iotdb.db.utils.constant.SqlConstant; import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils; @@ -35,7 +36,7 @@ import java.util.stream.Collectors; public class CrossSeriesAggregationDescriptor extends AggregationDescriptor { - private final Expression outputExpression; + private final List<Expression> outputExpressions; /** * Records how many Expressions are in one input, used for calculation of inputColumnNames @@ -51,9 +52,9 @@ public class CrossSeriesAggregationDescriptor extends AggregationDescriptor { List<Expression> inputExpressions, int numberOfInput, Map<String, String> inputAttributes, - Expression outputExpression) { + List<Expression> outputExpressions) { super(aggregationFuncName, step, inputExpressions, inputAttributes); - this.outputExpression = outputExpression; + this.outputExpressions = outputExpressions; this.expressionNumOfOneInput = inputExpressions.size() / numberOfInput; } @@ -66,23 +67,23 @@ public class CrossSeriesAggregationDescriptor extends AggregationDescriptor { AggregationStep step, List<Expression> inputExpressions, Map<String, String> inputAttributes, - Expression outputExpression) { + List<Expression> outputExpressions) { super(aggregationFuncName, step, inputExpressions, inputAttributes); - this.outputExpression = outputExpression; + this.outputExpressions = outputExpressions; this.expressionNumOfOneInput = 1; } public CrossSeriesAggregationDescriptor( AggregationDescriptor aggregationDescriptor, - Expression outputExpression, + List<Expression> outputExpressions, int expressionNumOfOneInput) { super(aggregationDescriptor); - this.outputExpression = outputExpression; + this.outputExpressions = outputExpressions; this.expressionNumOfOneInput = expressionNumOfOneInput; } - public Expression getOutputExpression() { - return outputExpression; + public List<Expression> getOutputExpressions() { + return outputExpressions; } /** @@ -97,11 +98,16 @@ public class CrossSeriesAggregationDescriptor extends AggregationDescriptor { @Override public String getParametersString() { if (parametersString == null) { - StringBuilder builder = new StringBuilder(outputExpression.getExpressionString()); - for (int i = 1; i < expressionNumOfOneInput; i++) { - builder.append(", ").append(inputExpressions.get(i).getExpressionString()); + StringBuilder builder; + if (TAggregationType.COUNT_IF.equals(aggregationType)) { + builder = new StringBuilder(outputExpressions.get(0).getExpressionString()); + for (int i = 1; i < expressionNumOfOneInput; i++) { + builder.append(", ").append(inputExpressions.get(i).getExpressionString()); + } + appendAttributes(builder); + } else { + builder = getOutputExpressionsAsBuilder(); } - appendAttributes(builder); parametersString = builder.toString(); } return parametersString; @@ -151,31 +157,48 @@ public class CrossSeriesAggregationDescriptor extends AggregationDescriptor { return builder.toString(); } + public StringBuilder getOutputExpressionsAsBuilder() { + StringBuilder builder = new StringBuilder(outputExpressions.get(0).getExpressionString()); + for (int i = 1; i < outputExpressions.size(); i++) { + builder.append(", ").append(outputExpressions.get(i).getExpressionString()); + } + appendAttributes(builder); + return builder; + } + @Override public CrossSeriesAggregationDescriptor deepClone() { - return new CrossSeriesAggregationDescriptor(this, outputExpression, expressionNumOfOneInput); + return new CrossSeriesAggregationDescriptor(this, outputExpressions, expressionNumOfOneInput); } @Override public void serialize(ByteBuffer byteBuffer) { super.serialize(byteBuffer); - Expression.serialize(outputExpression, byteBuffer); + ReadWriteIOUtils.write(outputExpressions.size(), byteBuffer); + outputExpressions.forEach(x -> Expression.serialize(x, byteBuffer)); ReadWriteIOUtils.write(expressionNumOfOneInput, byteBuffer); } @Override public void serialize(DataOutputStream stream) throws IOException { super.serialize(stream); - Expression.serialize(outputExpression, stream); + ReadWriteIOUtils.write(outputExpressions.size(), stream); + for (Expression x : outputExpressions) { + Expression.serialize(x, stream); + } ReadWriteIOUtils.write(expressionNumOfOneInput, stream); } public static CrossSeriesAggregationDescriptor deserialize(ByteBuffer byteBuffer) { AggregationDescriptor aggregationDescriptor = AggregationDescriptor.deserialize(byteBuffer); - Expression outputExpression = Expression.deserialize(byteBuffer); + int size = ReadWriteIOUtils.readInt(byteBuffer); + List<Expression> outputExpressions = new ArrayList<>(size); + for (int i = 0; i < size; i++) { + outputExpressions.add(Expression.deserialize(byteBuffer)); + } int expressionNumOfOneInput = ReadWriteIOUtils.readInt(byteBuffer); return new CrossSeriesAggregationDescriptor( - aggregationDescriptor, outputExpression, expressionNumOfOneInput); + aggregationDescriptor, outputExpressions, expressionNumOfOneInput); } @Override @@ -190,11 +213,15 @@ public class CrossSeriesAggregationDescriptor extends AggregationDescriptor { return false; } CrossSeriesAggregationDescriptor that = (CrossSeriesAggregationDescriptor) o; - return Objects.equals(outputExpression, that.outputExpression); + return Objects.equals(outputExpressions, that.outputExpressions); } @Override public int hashCode() { - return Objects.hash(super.hashCode(), outputExpression); + return Objects.hash(super.hashCode(), outputExpressions); + } + + public int getExpressionNumOfOneInput() { + return expressionNumOfOneInput; } } diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/analyze/AggregationDescriptorTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/analyze/AggregationDescriptorTest.java index e109445da20..2e4c2daf9d7 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/analyze/AggregationDescriptorTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/analyze/AggregationDescriptorTest.java @@ -99,7 +99,7 @@ public class AggregationDescriptorTest { new TimeSeriesOperand(pathMap.get("root.sg.d1.s1"))), 2, Collections.emptyMap(), - new TimeSeriesOperand(pathMap.get("root.sg.*.s1")))); + Collections.singletonList(new TimeSeriesOperand(pathMap.get("root.sg.*.s1"))))); groupByLevelDescriptorList.add( new CrossSeriesAggregationDescriptor( TAggregationType.AVG.name().toLowerCase(), @@ -109,7 +109,7 @@ public class AggregationDescriptorTest { new TimeSeriesOperand(pathMap.get("root.sg.d2.s1"))), 2, Collections.emptyMap(), - new TimeSeriesOperand(pathMap.get("root.sg.*.s1")))); + Collections.singletonList(new TimeSeriesOperand(pathMap.get("root.sg.*.s1"))))); groupByLevelDescriptorList.add( new CrossSeriesAggregationDescriptor( TAggregationType.COUNT.name().toLowerCase(), @@ -119,7 +119,7 @@ public class AggregationDescriptorTest { new TimeSeriesOperand(pathMap.get("root.sg.d1.s1"))), 2, Collections.emptyMap(), - new TimeSeriesOperand(pathMap.get("root.sg.*.s1")))); + Collections.singletonList(new TimeSeriesOperand(pathMap.get("root.sg.*.s1"))))); groupByLevelDescriptorList.add( new CrossSeriesAggregationDescriptor( TAggregationType.AVG.name().toLowerCase(), @@ -129,7 +129,7 @@ public class AggregationDescriptorTest { new TimeSeriesOperand(pathMap.get("root.sg.d2.s1"))), 2, Collections.emptyMap(), - new TimeSeriesOperand(pathMap.get("root.sg.*.s1")))); + Collections.singletonList(new TimeSeriesOperand(pathMap.get("root.sg.*.s1"))))); } @Test diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlannerTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlannerTest.java index ca227932c71..b60372636ec 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlannerTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlannerTest.java @@ -751,7 +751,7 @@ public class LogicalPlannerTest { tagValuesToAggregationDescriptors.get(Collections.singletonList("value1")); Assert.assertEquals(1, descriptors.size()); CrossSeriesAggregationDescriptor descriptor = descriptors.get(0); - Assert.assertEquals("s1", descriptor.getOutputExpression().toString()); + Assert.assertEquals("s1", descriptor.getOutputExpressions().get(0).toString()); Assert.assertEquals(TAggregationType.MAX_VALUE, descriptor.getAggregationType()); Assert.assertEquals(AggregationStep.FINAL, descriptor.getStep()); Assert.assertEquals(3, descriptor.getInputExpressions().size()); diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/QueryLogicalPlanUtil.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/QueryLogicalPlanUtil.java index 07d89735810..5bd54736c73 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/QueryLogicalPlanUtil.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/QueryLogicalPlanUtil.java @@ -584,7 +584,8 @@ public class QueryLogicalPlanUtil { new TimeSeriesOperand(schemaMap.get("root.sg.d1.s1"))), 2, Collections.emptyMap(), - new TimeSeriesOperand(schemaMap.get("root.sg.*.s1"))), + Collections.singletonList( + new TimeSeriesOperand(schemaMap.get("root.sg.*.s1")))), new CrossSeriesAggregationDescriptor( TAggregationType.COUNT.name().toLowerCase(), AggregationStep.FINAL, @@ -592,7 +593,8 @@ public class QueryLogicalPlanUtil { new TimeSeriesOperand(schemaMap.get("root.sg.d2.a.s1"))), 1, Collections.emptyMap(), - new TimeSeriesOperand(schemaMap.get("root.sg.*.*.s1"))), + Collections.singletonList( + new TimeSeriesOperand(schemaMap.get("root.sg.*.*.s1")))), new CrossSeriesAggregationDescriptor( TAggregationType.MAX_VALUE.name().toLowerCase(), AggregationStep.FINAL, @@ -601,7 +603,8 @@ public class QueryLogicalPlanUtil { new TimeSeriesOperand(schemaMap.get("root.sg.d2.s2"))), 2, Collections.emptyMap(), - new TimeSeriesOperand(schemaMap.get("root.sg.*.s2"))), + Collections.singletonList( + new TimeSeriesOperand(schemaMap.get("root.sg.*.s2")))), new CrossSeriesAggregationDescriptor( TAggregationType.MAX_VALUE.name().toLowerCase(), AggregationStep.FINAL, @@ -609,7 +612,8 @@ public class QueryLogicalPlanUtil { new TimeSeriesOperand(schemaMap.get("root.sg.d2.a.s2"))), 1, Collections.emptyMap(), - new TimeSeriesOperand(schemaMap.get("root.sg.*.*.s2"))), + Collections.singletonList( + new TimeSeriesOperand(schemaMap.get("root.sg.*.*.s2")))), new CrossSeriesAggregationDescriptor( TAggregationType.LAST_VALUE.name().toLowerCase(), AggregationStep.FINAL, @@ -618,7 +622,8 @@ public class QueryLogicalPlanUtil { new TimeSeriesOperand(schemaMap.get("root.sg.d1.s1"))), 2, Collections.emptyMap(), - new TimeSeriesOperand(schemaMap.get("root.sg.*.s1"))), + Collections.singletonList( + new TimeSeriesOperand(schemaMap.get("root.sg.*.s1")))), new CrossSeriesAggregationDescriptor( TAggregationType.LAST_VALUE.name().toLowerCase(), AggregationStep.FINAL, @@ -626,7 +631,8 @@ public class QueryLogicalPlanUtil { new TimeSeriesOperand(schemaMap.get("root.sg.d2.a.s1"))), 1, Collections.emptyMap(), - new TimeSeriesOperand(schemaMap.get("root.sg.*.*.s1")))), + Collections.singletonList( + new TimeSeriesOperand(schemaMap.get("root.sg.*.*.s1"))))), null, Ordering.DESC); @@ -843,7 +849,8 @@ public class QueryLogicalPlanUtil { new TimeSeriesOperand(schemaMap.get("root.sg.d1.s1"))), 2, Collections.emptyMap(), - new TimeSeriesOperand(schemaMap.get("root.sg.*.s1"))), + Collections.singletonList( + new TimeSeriesOperand(schemaMap.get("root.sg.*.s1")))), new CrossSeriesAggregationDescriptor( TAggregationType.MAX_VALUE.name().toLowerCase(), AggregationStep.FINAL, @@ -852,7 +859,8 @@ public class QueryLogicalPlanUtil { new TimeSeriesOperand(schemaMap.get("root.sg.d2.s2"))), 2, Collections.emptyMap(), - new TimeSeriesOperand(schemaMap.get("root.sg.*.s2"))), + Collections.singletonList( + new TimeSeriesOperand(schemaMap.get("root.sg.*.s2")))), new CrossSeriesAggregationDescriptor( TAggregationType.LAST_VALUE.name().toLowerCase(), AggregationStep.FINAL, @@ -861,7 +869,8 @@ public class QueryLogicalPlanUtil { new TimeSeriesOperand(schemaMap.get("root.sg.d1.s1"))), 2, Collections.emptyMap(), - new TimeSeriesOperand(schemaMap.get("root.sg.*.s1")))), + Collections.singletonList( + new TimeSeriesOperand(schemaMap.get("root.sg.*.s1"))))), null, Ordering.DESC); diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/AggregationDistributionTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/AggregationDistributionTest.java index 673fb749960..788c70917b8 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/AggregationDistributionTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/AggregationDistributionTest.java @@ -304,7 +304,8 @@ public class AggregationDistributionTest { new TimeSeriesOperand(new PartialPath(d2s1Path))), 2, Collections.emptyMap(), - new TimeSeriesOperand(new PartialPath(groupedPath)))), + Collections.singletonList( + new TimeSeriesOperand(new PartialPath(groupedPath))))), null, Ordering.ASC); Analysis analysis = Util.constructAnalysis(); @@ -344,7 +345,8 @@ public class AggregationDistributionTest { new TimeSeriesOperand(new PartialPath(d4s1Path))), 2, Collections.emptyMap(), - new TimeSeriesOperand(new PartialPath(groupedPath)))), + Collections.singletonList( + new TimeSeriesOperand(new PartialPath(groupedPath))))), null, Ordering.ASC); Analysis analysis = Util.constructAnalysis(); @@ -421,7 +423,8 @@ public class AggregationDistributionTest { new TimeSeriesOperand(new PartialPath(d4s1Path))), 2, Collections.emptyMap(), - new TimeSeriesOperand(new PartialPath(groupedPath)))), + Collections.singletonList( + new TimeSeriesOperand(new PartialPath(groupedPath))))), null, Ordering.ASC); @@ -503,14 +506,16 @@ public class AggregationDistributionTest { Collections.singletonList(new TimeSeriesOperand(new PartialPath(d1s1Path))), 1, Collections.emptyMap(), - new TimeSeriesOperand(new PartialPath(groupedPathS1))), + Collections.singletonList( + new TimeSeriesOperand(new PartialPath(groupedPathS1)))), new CrossSeriesAggregationDescriptor( TAggregationType.COUNT.name().toLowerCase(), AggregationStep.FINAL, Collections.singletonList(new TimeSeriesOperand(new PartialPath(d1s2Path))), 1, Collections.emptyMap(), - new TimeSeriesOperand(new PartialPath(groupedPathS2)))), + Collections.singletonList( + new TimeSeriesOperand(new PartialPath(groupedPathS2))))), null, Ordering.ASC); Analysis analysis = Util.constructAnalysis(); @@ -569,14 +574,16 @@ public class AggregationDistributionTest { new TimeSeriesOperand(new PartialPath(d2s1Path))), 2, Collections.emptyMap(), - new TimeSeriesOperand(new PartialPath(groupedPathS1))), + Collections.singletonList( + new TimeSeriesOperand(new PartialPath(groupedPathS1)))), new CrossSeriesAggregationDescriptor( TAggregationType.COUNT.name().toLowerCase(), AggregationStep.FINAL, Collections.singletonList(new TimeSeriesOperand(new PartialPath(d1s2Path))), 1, Collections.emptyMap(), - new TimeSeriesOperand(new PartialPath(groupedPathS2)))), + Collections.singletonList( + new TimeSeriesOperand(new PartialPath(groupedPathS2))))), null, Ordering.ASC); Analysis analysis = Util.constructAnalysis(); @@ -665,14 +672,16 @@ public class AggregationDistributionTest { new TimeSeriesOperand(new PartialPath(d2s1Path))), 2, Collections.emptyMap(), - new TimeSeriesOperand(new PartialPath(groupedPathS1))), + Collections.singletonList( + new TimeSeriesOperand(new PartialPath(groupedPathS1)))), new CrossSeriesAggregationDescriptor( TAggregationType.COUNT.name().toLowerCase(), AggregationStep.FINAL, Collections.singletonList(new TimeSeriesOperand(new PartialPath(d1s2Path))), 2, Collections.emptyMap(), - new TimeSeriesOperand(new PartialPath(groupedPathS2)))), + Collections.singletonList( + new TimeSeriesOperand(new PartialPath(groupedPathS2))))), null, Ordering.ASC); Analysis analysis = Util.constructAnalysis(); @@ -840,7 +849,7 @@ public class AggregationDistributionTest { List<CrossSeriesAggregationDescriptor> descriptors = node.getGroupByLevelDescriptors(); assertEquals(expected.size(), descriptors.size()); for (CrossSeriesAggregationDescriptor descriptor : descriptors) { - String outputExpression = descriptor.getOutputExpression().getExpressionString(); + String outputExpression = descriptor.getOutputExpressions().get(0).getExpressionString(); assertEquals(expected.get(outputExpression).size(), descriptor.getInputExpressions().size()); for (Expression inputExpression : descriptor.getInputExpressions()) { assertTrue(expected.get(outputExpression).contains(inputExpression.getExpressionString())); diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/node/process/GroupByLevelNodeSerdeTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/node/process/GroupByLevelNodeSerdeTest.java index 6d379d72f17..c7cff7ea412 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/node/process/GroupByLevelNodeSerdeTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/node/process/GroupByLevelNodeSerdeTest.java @@ -97,7 +97,8 @@ public class GroupByLevelNodeSerdeTest { new TimeSeriesOperand(new PartialPath("root.sg.d2.s1"))), 2, Collections.emptyMap(), - new TimeSeriesOperand(new PartialPath("root.sg.*.s1")))), + Collections.singletonList( + new TimeSeriesOperand(new PartialPath("root.sg.*.s1"))))), groupByTimeParameter, Ordering.ASC); diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/node/process/GroupByTagNodeSerdeTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/node/process/GroupByTagNodeSerdeTest.java index b7e879681b2..3d307a51306 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/node/process/GroupByTagNodeSerdeTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/node/process/GroupByTagNodeSerdeTest.java @@ -67,11 +67,12 @@ public class GroupByTagNodeSerdeTest { Collections.singletonList(new TimeSeriesOperand(new PartialPath("root.sg.d1.s1"))), 1, Collections.emptyMap(), - new FunctionExpression( - "max_time", - new LinkedHashMap<>(), - Collections.singletonList( - new TimeSeriesOperand(new PartialPath("root.sg.d1.s1"))))); + Collections.singletonList( + new FunctionExpression( + "max_time", + new LinkedHashMap<>(), + Collections.singletonList( + new TimeSeriesOperand(new PartialPath("root.sg.d1.s1")))))); CrossSeriesAggregationDescriptor s1Avg = new CrossSeriesAggregationDescriptor( TAggregationType.AVG.name().toLowerCase(), @@ -79,11 +80,12 @@ public class GroupByTagNodeSerdeTest { Collections.singletonList(new TimeSeriesOperand(new PartialPath("root.sg.d1.s1"))), 1, Collections.emptyMap(), - new FunctionExpression( - "avg", - new LinkedHashMap<>(), - Collections.singletonList( - new TimeSeriesOperand(new PartialPath("root.sg.d1.s1"))))); + Collections.singletonList( + new FunctionExpression( + "avg", + new LinkedHashMap<>(), + Collections.singletonList( + new TimeSeriesOperand(new PartialPath("root.sg.d1.s1")))))); AggregationDescriptor s1MaxTimePartial = new AggregationDescriptor( TAggregationType.MAX_TIME.name().toLowerCase(),
