This is an automated email from the ASF dual-hosted git repository. lancelly pushed a commit to branch max_by in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 05b2b6dc3ead03d0d6205b5419a578abdac70d02 Author: lancelly <[email protected]> AuthorDate: Fri Jan 26 20:23:09 2024 +0800 Add slidingWindow IT --- .../it/aggregation/IoTDBAggregationByLevelIT.java | 36 +++++++++++++++++++ .../db/it/aggregation/maxby/IoTDBMaxByIT.java | 42 ++++++++++++---------- .../execution/aggregation/MaxByAccumulator.java | 8 ++--- .../SlidingWindowAggregatorFactory.java | 21 +++++++++++ 4 files changed, 85 insertions(+), 22 deletions(-) diff --git a/integration-test/src/test/java/org/apache/iotdb/db/it/aggregation/IoTDBAggregationByLevelIT.java b/integration-test/src/test/java/org/apache/iotdb/db/it/aggregation/IoTDBAggregationByLevelIT.java index 5f0beea7dcc..4207e21c78f 100644 --- a/integration-test/src/test/java/org/apache/iotdb/db/it/aggregation/IoTDBAggregationByLevelIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/db/it/aggregation/IoTDBAggregationByLevelIT.java @@ -282,6 +282,42 @@ public class IoTDBAggregationByLevelIT { cnt++; } } + + + try (ResultSet resultSet = + statement.executeQuery( + "select max_by(temperature) from root.sg1.* GROUP BY level=0")) { + while (resultSet.next()) { + String ans = + resultSet.getString(lastValue("root.*.d1.temperature")) + + "," + + resultSet.getString(lastValue("root.*.d2.temperature")) + + "," + + resultSet.getString(maxValue("root.*.d1.temperature")) + + "," + + resultSet.getString(maxValue("root.*.d2.temperature")); + Assert.assertEquals(retArray[cnt], ans); + cnt++; + } + } + + + try (ResultSet resultSet = + statement.executeQuery( + "select last_value(temperature), max_value(temperature) from root.sg1.* GROUP BY level=2")) { + while (resultSet.next()) { + String ans = + resultSet.getString(lastValue("root.*.d1.temperature")) + + "," + + resultSet.getString(lastValue("root.*.d2.temperature")) + + "," + + resultSet.getString(maxValue("root.*.d1.temperature")) + + "," + + resultSet.getString(maxValue("root.*.d2.temperature")); + Assert.assertEquals(retArray[cnt], ans); + cnt++; + } + } Assert.assertEquals(retArray.length, cnt); } } diff --git a/integration-test/src/test/java/org/apache/iotdb/db/it/aggregation/maxby/IoTDBMaxByIT.java b/integration-test/src/test/java/org/apache/iotdb/db/it/aggregation/maxby/IoTDBMaxByIT.java index 1cde3f239b8..29314a95f53 100644 --- a/integration-test/src/test/java/org/apache/iotdb/db/it/aggregation/maxby/IoTDBMaxByIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/db/it/aggregation/maxby/IoTDBMaxByIT.java @@ -355,27 +355,33 @@ public class IoTDBMaxByIT { public void testMaxByWithSlidingWindow() { try (Connection connection = EnvFactory.getEnv().getConnection(); Statement statement = connection.createStatement()) { - String[] expectedHeader = - new String[] { - TIMESTAMP_STR, - "max_by(root.db.d1.x1, root.db.d1.y2)", - "max_by(root.db.d1.x2, root.db.d1.y2)", - "max_by(root.db.d1.x3, root.db.d1.y2)", - "max_by(root.db.d1.x4, root.db.d1.y2)", - "max_by(root.db.d1.x5, root.db.d1.y2)", - "max_by(root.db.d1.x6, root.db.d1.y2)", - }; String[] retArray = new String[] { - "0,3,3,3.0,3.0,false,3,", "4,null,null,null,null,null,null,", "8,3,3,3.0,3.0,false,3," + "0,3,3,3.0,3.0,false,3,", + "2,null,null,null,null,null,null,", + "4,null,null,null,null,null,null,", + "6,3,3,3.0,3.0,false,3,", + "8,3,3,3.0,3.0,false,3," }; - String y = "y2"; - resultSetEqualTest( - String.format( - "select max_by(x1,%s),max_by(x2,%s),max_by(x3,%s),max_by(x4,%s),max_by(x5,%s),max_by(x6,%s) from root.db.d1 group by ([0,9),4ms)", - y, y, y, y, y, y), - expectedHeader, - retArray); + String[] yArray = new String[] {"y1", "y2", "y3", "y4"}; + for (String y : yArray) { + String[] expectedHeader = + new String[] { + TIMESTAMP_STR, + String.format("max_by(root.db.d1.x1, root.db.d1.%s)", y), + String.format("max_by(root.db.d1.x2, root.db.d1.%s)", y), + String.format("max_by(root.db.d1.x3, root.db.d1.%s)", y), + String.format("max_by(root.db.d1.x4, root.db.d1.%s)", y), + String.format("max_by(root.db.d1.x5, root.db.d1.%s)", y), + String.format("max_by(root.db.d1.x6, root.db.d1.%s)", y), + }; + resultSetEqualTest( + String.format( + "select max_by(x1,%s),max_by(x2,%s),max_by(x3,%s),max_by(x4,%s),max_by(x5,%s),max_by(x6,%s) from root.db.d1 group by ([0,9),4ms,2ms)", + y, y, y, y, y, y), + expectedHeader, + retArray); + } } catch (Exception e) { e.printStackTrace(); fail(e.getMessage()); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/MaxByAccumulator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/MaxByAccumulator.java index 47092e43962..922acc45aa3 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/MaxByAccumulator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/MaxByAccumulator.java @@ -341,25 +341,25 @@ public class MaxByAccumulator implements Accumulator { ColumnBuilder columnBuilder = builder.getValueColumnBuilders()[0]; switch (yDataType) { case INT32: - int intMaxVal = BytesUtils.bytesToInt(bytes); + int intMaxVal = BytesUtils.bytesToInt(bytes, 0); offset += Integer.BYTES; readXFromBytesIntermediateInput(bytes, offset, columnBuilder); updateIntResult(intMaxVal, columnBuilder.build(), 0); break; case INT64: - long longMaxVal = BytesUtils.bytesToLong(bytes); + long longMaxVal = BytesUtils.bytesToLongFromOffset(bytes, Long.BYTES, 0); offset += Long.BYTES; readXFromBytesIntermediateInput(bytes, offset, columnBuilder); updateLongResult(longMaxVal, columnBuilder.build(), 0); break; case FLOAT: - float floatMaxVal = BytesUtils.bytesToFloat(bytes); + float floatMaxVal = BytesUtils.bytesToFloat(bytes, 0); offset += Float.BYTES; readXFromBytesIntermediateInput(bytes, offset, columnBuilder); updateFloatResult(floatMaxVal, columnBuilder.build(), 0); break; case DOUBLE: - double doubleMaxVal = BytesUtils.bytesToDouble(bytes); + double doubleMaxVal = BytesUtils.bytesToDouble(bytes, 0); offset += Long.BYTES; readXFromBytesIntermediateInput(bytes, offset, columnBuilder); updateDoubleResult(doubleMaxVal, columnBuilder.build(), 0); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/slidingwindow/SlidingWindowAggregatorFactory.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/slidingwindow/SlidingWindowAggregatorFactory.java index 53e477be245..cfc6461d1bc 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/slidingwindow/SlidingWindowAggregatorFactory.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/slidingwindow/SlidingWindowAggregatorFactory.java @@ -28,6 +28,7 @@ import org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.AggregationSt import org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.InputLocation; import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; import org.apache.iotdb.tsfile.read.common.block.column.Column; +import org.apache.iotdb.tsfile.utils.BytesUtils; import java.util.Comparator; import java.util.EnumMap; @@ -47,6 +48,9 @@ public class SlidingWindowAggregatorFactory { private static final Map<TSDataType, Comparator<Column>> extremeComparators = new EnumMap<>(TSDataType.class); + private static final Map<TSDataType, Comparator<Column>> maxByComparators = + new EnumMap<>(TSDataType.class); + static { // return a value greater than 0 if o1 is numerically greater than o2 maxComparators.put(TSDataType.INT32, Comparator.comparingInt(o -> o.getInt(0))); @@ -115,6 +119,20 @@ public class SlidingWindowAggregatorFactory { } return -1; }); + // Intermediate Value of maxBy is a byte array: | y | xNull | x | + maxByComparators.put( + TSDataType.INT32, + Comparator.comparingInt(o -> BytesUtils.bytesToInt(o.getBinary(0).getValues(), 0))); + maxByComparators.put( + TSDataType.INT64, + Comparator.comparingLong( + o -> BytesUtils.bytesToLongFromOffset(o.getBinary(0).getValues(), Long.BYTES, 0))); + maxByComparators.put( + TSDataType.FLOAT, + Comparator.comparing(o -> BytesUtils.bytesToFloat(o.getBinary(0).getValues(), 0))); + maxByComparators.put( + TSDataType.DOUBLE, + Comparator.comparingDouble(o -> BytesUtils.bytesToDouble(o.getBinary(0).getValues(), 0))); } public static SlidingWindowAggregator createSlidingWindowAggregator( @@ -159,6 +177,9 @@ public class SlidingWindowAggregatorFactory { return !ascending ? new NormalQueueSlidingWindowAggregator(accumulator, inputLocationList, step) : new EmptyQueueSlidingWindowAggregator(accumulator, inputLocationList, step); + case MAX_BY: + return new MonotonicQueueSlidingWindowAggregator( + accumulator, inputLocationList, step, maxByComparators.get(dataTypes.get(1))); case COUNT_IF: throw new SemanticException("COUNT_IF with slidingWindow is not supported now"); case TIME_DURATION:
