This is an automated email from the ASF dual-hosted git repository.

lancelly pushed a commit to branch max_by
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 05b2b6dc3ead03d0d6205b5419a578abdac70d02
Author: lancelly <[email protected]>
AuthorDate: Fri Jan 26 20:23:09 2024 +0800

    Add slidingWindow IT
---
 .../it/aggregation/IoTDBAggregationByLevelIT.java  | 36 +++++++++++++++++++
 .../db/it/aggregation/maxby/IoTDBMaxByIT.java      | 42 ++++++++++++----------
 .../execution/aggregation/MaxByAccumulator.java    |  8 ++---
 .../SlidingWindowAggregatorFactory.java            | 21 +++++++++++
 4 files changed, 85 insertions(+), 22 deletions(-)

diff --git 
a/integration-test/src/test/java/org/apache/iotdb/db/it/aggregation/IoTDBAggregationByLevelIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/db/it/aggregation/IoTDBAggregationByLevelIT.java
index 5f0beea7dcc..4207e21c78f 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/db/it/aggregation/IoTDBAggregationByLevelIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/db/it/aggregation/IoTDBAggregationByLevelIT.java
@@ -282,6 +282,42 @@ public class IoTDBAggregationByLevelIT {
           cnt++;
         }
       }
+
+
+      try (ResultSet resultSet =
+                   statement.executeQuery(
+                           "select max_by(temperature) from root.sg1.* GROUP 
BY level=0")) {
+        while (resultSet.next()) {
+          String ans =
+                  resultSet.getString(lastValue("root.*.d1.temperature"))
+                          + ","
+                          + 
resultSet.getString(lastValue("root.*.d2.temperature"))
+                          + ","
+                          + 
resultSet.getString(maxValue("root.*.d1.temperature"))
+                          + ","
+                          + 
resultSet.getString(maxValue("root.*.d2.temperature"));
+          Assert.assertEquals(retArray[cnt], ans);
+          cnt++;
+        }
+      }
+
+
+      try (ResultSet resultSet =
+                   statement.executeQuery(
+                           "select last_value(temperature), 
max_value(temperature) from root.sg1.* GROUP BY level=2")) {
+        while (resultSet.next()) {
+          String ans =
+                  resultSet.getString(lastValue("root.*.d1.temperature"))
+                          + ","
+                          + 
resultSet.getString(lastValue("root.*.d2.temperature"))
+                          + ","
+                          + 
resultSet.getString(maxValue("root.*.d1.temperature"))
+                          + ","
+                          + 
resultSet.getString(maxValue("root.*.d2.temperature"));
+          Assert.assertEquals(retArray[cnt], ans);
+          cnt++;
+        }
+      }
       Assert.assertEquals(retArray.length, cnt);
     }
   }
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/db/it/aggregation/maxby/IoTDBMaxByIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/db/it/aggregation/maxby/IoTDBMaxByIT.java
index 1cde3f239b8..29314a95f53 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/db/it/aggregation/maxby/IoTDBMaxByIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/db/it/aggregation/maxby/IoTDBMaxByIT.java
@@ -355,27 +355,33 @@ public class IoTDBMaxByIT {
   public void testMaxByWithSlidingWindow() {
     try (Connection connection = EnvFactory.getEnv().getConnection();
         Statement statement = connection.createStatement()) {
-      String[] expectedHeader =
-          new String[] {
-            TIMESTAMP_STR,
-            "max_by(root.db.d1.x1, root.db.d1.y2)",
-            "max_by(root.db.d1.x2, root.db.d1.y2)",
-            "max_by(root.db.d1.x3, root.db.d1.y2)",
-            "max_by(root.db.d1.x4, root.db.d1.y2)",
-            "max_by(root.db.d1.x5, root.db.d1.y2)",
-            "max_by(root.db.d1.x6, root.db.d1.y2)",
-          };
       String[] retArray =
           new String[] {
-            "0,3,3,3.0,3.0,false,3,", "4,null,null,null,null,null,null,", 
"8,3,3,3.0,3.0,false,3,"
+            "0,3,3,3.0,3.0,false,3,",
+            "2,null,null,null,null,null,null,",
+            "4,null,null,null,null,null,null,",
+            "6,3,3,3.0,3.0,false,3,",
+            "8,3,3,3.0,3.0,false,3,"
           };
-      String y = "y2";
-      resultSetEqualTest(
-          String.format(
-              "select 
max_by(x1,%s),max_by(x2,%s),max_by(x3,%s),max_by(x4,%s),max_by(x5,%s),max_by(x6,%s)
 from root.db.d1 group by ([0,9),4ms)",
-              y, y, y, y, y, y),
-          expectedHeader,
-          retArray);
+      String[] yArray = new String[] {"y1", "y2", "y3", "y4"};
+      for (String y : yArray) {
+        String[] expectedHeader =
+            new String[] {
+              TIMESTAMP_STR,
+              String.format("max_by(root.db.d1.x1, root.db.d1.%s)", y),
+              String.format("max_by(root.db.d1.x2, root.db.d1.%s)", y),
+              String.format("max_by(root.db.d1.x3, root.db.d1.%s)", y),
+              String.format("max_by(root.db.d1.x4, root.db.d1.%s)", y),
+              String.format("max_by(root.db.d1.x5, root.db.d1.%s)", y),
+              String.format("max_by(root.db.d1.x6, root.db.d1.%s)", y),
+            };
+        resultSetEqualTest(
+            String.format(
+                "select 
max_by(x1,%s),max_by(x2,%s),max_by(x3,%s),max_by(x4,%s),max_by(x5,%s),max_by(x6,%s)
 from root.db.d1 group by ([0,9),4ms,2ms)",
+                y, y, y, y, y, y),
+            expectedHeader,
+            retArray);
+      }
     } catch (Exception e) {
       e.printStackTrace();
       fail(e.getMessage());
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/MaxByAccumulator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/MaxByAccumulator.java
index 47092e43962..922acc45aa3 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/MaxByAccumulator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/MaxByAccumulator.java
@@ -341,25 +341,25 @@ public class MaxByAccumulator implements Accumulator {
     ColumnBuilder columnBuilder = builder.getValueColumnBuilders()[0];
     switch (yDataType) {
       case INT32:
-        int intMaxVal = BytesUtils.bytesToInt(bytes);
+        int intMaxVal = BytesUtils.bytesToInt(bytes, 0);
         offset += Integer.BYTES;
         readXFromBytesIntermediateInput(bytes, offset, columnBuilder);
         updateIntResult(intMaxVal, columnBuilder.build(), 0);
         break;
       case INT64:
-        long longMaxVal = BytesUtils.bytesToLong(bytes);
+        long longMaxVal = BytesUtils.bytesToLongFromOffset(bytes, Long.BYTES, 
0);
         offset += Long.BYTES;
         readXFromBytesIntermediateInput(bytes, offset, columnBuilder);
         updateLongResult(longMaxVal, columnBuilder.build(), 0);
         break;
       case FLOAT:
-        float floatMaxVal = BytesUtils.bytesToFloat(bytes);
+        float floatMaxVal = BytesUtils.bytesToFloat(bytes, 0);
         offset += Float.BYTES;
         readXFromBytesIntermediateInput(bytes, offset, columnBuilder);
         updateFloatResult(floatMaxVal, columnBuilder.build(), 0);
         break;
       case DOUBLE:
-        double doubleMaxVal = BytesUtils.bytesToDouble(bytes);
+        double doubleMaxVal = BytesUtils.bytesToDouble(bytes, 0);
         offset += Long.BYTES;
         readXFromBytesIntermediateInput(bytes, offset, columnBuilder);
         updateDoubleResult(doubleMaxVal, columnBuilder.build(), 0);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/slidingwindow/SlidingWindowAggregatorFactory.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/slidingwindow/SlidingWindowAggregatorFactory.java
index 53e477be245..cfc6461d1bc 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/slidingwindow/SlidingWindowAggregatorFactory.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/slidingwindow/SlidingWindowAggregatorFactory.java
@@ -28,6 +28,7 @@ import 
org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.AggregationSt
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.InputLocation;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.read.common.block.column.Column;
+import org.apache.iotdb.tsfile.utils.BytesUtils;
 
 import java.util.Comparator;
 import java.util.EnumMap;
@@ -47,6 +48,9 @@ public class SlidingWindowAggregatorFactory {
   private static final Map<TSDataType, Comparator<Column>> extremeComparators =
       new EnumMap<>(TSDataType.class);
 
+  private static final Map<TSDataType, Comparator<Column>> maxByComparators =
+      new EnumMap<>(TSDataType.class);
+
   static {
     // return a value greater than 0 if o1 is numerically greater than o2
     maxComparators.put(TSDataType.INT32, Comparator.comparingInt(o -> 
o.getInt(0)));
@@ -115,6 +119,20 @@ public class SlidingWindowAggregatorFactory {
           }
           return -1;
         });
+    // Intermediate Value of maxBy is a byte array: | y | xNull | x |
+    maxByComparators.put(
+        TSDataType.INT32,
+        Comparator.comparingInt(o -> 
BytesUtils.bytesToInt(o.getBinary(0).getValues(), 0)));
+    maxByComparators.put(
+        TSDataType.INT64,
+        Comparator.comparingLong(
+            o -> BytesUtils.bytesToLongFromOffset(o.getBinary(0).getValues(), 
Long.BYTES, 0)));
+    maxByComparators.put(
+        TSDataType.FLOAT,
+        Comparator.comparing(o -> 
BytesUtils.bytesToFloat(o.getBinary(0).getValues(), 0)));
+    maxByComparators.put(
+        TSDataType.DOUBLE,
+        Comparator.comparingDouble(o -> 
BytesUtils.bytesToDouble(o.getBinary(0).getValues(), 0)));
   }
 
   public static SlidingWindowAggregator createSlidingWindowAggregator(
@@ -159,6 +177,9 @@ public class SlidingWindowAggregatorFactory {
         return !ascending
             ? new NormalQueueSlidingWindowAggregator(accumulator, 
inputLocationList, step)
             : new EmptyQueueSlidingWindowAggregator(accumulator, 
inputLocationList, step);
+      case MAX_BY:
+        return new MonotonicQueueSlidingWindowAggregator(
+            accumulator, inputLocationList, step, 
maxByComparators.get(dataTypes.get(1)));
       case COUNT_IF:
         throw new SemanticException("COUNT_IF with slidingWindow is not 
supported now");
       case TIME_DURATION:

Reply via email to