This is an automated email from the ASF dual-hosted git repository.

caogaofei pushed a commit to branch add_max_min_by
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/add_max_min_by by this push:
     new b2288869f63 add it
b2288869f63 is described below

commit b2288869f63a97afffc012e46850ee2055da6e49
Author: Beyyes <[email protected]>
AuthorDate: Sat Oct 19 00:37:35 2024 +0800

    add it
---
 .../db/it/IoTDBMultiIDsWithAttributesTableIT.java  |  12 ++
 .../aggregation/TableMaxByAccumulator.java         |   5 +
 .../aggregation/TableMaxMinByBaseAccumulator.java  | 184 ++++++++++++++++-----
 .../aggregation/TableMinByAccumulator.java         |   5 +
 4 files changed, 161 insertions(+), 45 deletions(-)

diff --git 
a/integration-test/src/test/java/org/apache/iotdb/relational/it/db/it/IoTDBMultiIDsWithAttributesTableIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/relational/it/db/it/IoTDBMultiIDsWithAttributesTableIT.java
index d7de73b7283..01d62a5c348 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/relational/it/db/it/IoTDBMultiIDsWithAttributesTableIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/relational/it/db/it/IoTDBMultiIDsWithAttributesTableIT.java
@@ -1050,6 +1050,18 @@ public class IoTDBMultiIDsWithAttributesTableIT {
     tableResultSetEqualTest(sql, expectedHeader1, retArray, DATABASE_NAME);
   }
 
+  @Test
+  public void maxByMinByTest() {
+    String[] expectedHeader1 = buildHeaders(10);
+    sql =
+        "select 
max_by(time,floatnum),min_by(time,floatnum),max_by(time,date),min_by(time,date),max_by(time,floatnum),min_by(time,floatnum),max_by(time,ts),min_by(time,ts),max_by(time,stringv),min_by(time,stringv)
 from table0";
+    retArray =
+        new String[] {
+          
"1970-01-01T00:00:00.100Z,1970-01-01T00:00:00.040Z,1971-08-20T11:33:20.000Z,1971-08-20T11:33:20.000Z,1970-01-01T00:00:00.100Z,1970-01-01T00:00:00.040Z,1971-01-01T00:00:10.000Z,1971-01-01T00:01:40.000Z,1971-01-01T00:01:40.000Z,1971-01-01T00:00:01.000Z,",
+        };
+    tableResultSetEqualTest(sql, expectedHeader1, retArray, DATABASE_NAME);
+  }
+
   // ==================================================================
   // ============================ Join Test ===========================
   // ==================================================================
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/TableMaxByAccumulator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/TableMaxByAccumulator.java
index b8124e32421..ef0ebbb2b79 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/TableMaxByAccumulator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/TableMaxByAccumulator.java
@@ -56,6 +56,11 @@ public class TableMaxByAccumulator extends 
TableMaxMinByBaseAccumulator {
     return yValue.compareTo(yExtremeValue) > 0;
   }
 
+  @Override
+  protected boolean check(boolean yValue, boolean yExtremeValue) {
+    return yValue;
+  }
+
   @Override
   public long getEstimatedSize() {
     return INSTANCE_SIZE;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/TableMaxMinByBaseAccumulator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/TableMaxMinByBaseAccumulator.java
index 1ce7cc6165b..2b883e3c0ee 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/TableMaxMinByBaseAccumulator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/TableMaxMinByBaseAccumulator.java
@@ -54,7 +54,7 @@ public abstract class TableMaxMinByBaseAccumulator implements 
TableAccumulator {
 
   private boolean initResult;
 
-  private long yTimeStamp = Long.MAX_VALUE;
+  // private long yTimeStamp = Long.MAX_VALUE;
 
   private static final String UNSUPPORTED_TYPE_MESSAGE = "Unsupported data 
type in MaxBy/MinBy: %s";
 
@@ -65,7 +65,7 @@ public abstract class TableMaxMinByBaseAccumulator implements 
TableAccumulator {
     this.yExtremeValue = TsPrimitiveType.getByType(yDataType);
   }
 
-  // Column should be like: | Time | x | y |
+  // Column should be like: | x | y |
   @Override
   public void addInput(Column[] arguments) {
     checkArgument(arguments.length == 2, "Length of input Column[] for 
MaxBy/MinBy should be 2");
@@ -75,21 +75,23 @@ public abstract class TableMaxMinByBaseAccumulator 
implements TableAccumulator {
         addIntInput(arguments);
         return;
       case INT64:
-        //      case TIMESTAMP:
-        //        addLongInput(arguments);
-        //        return;
-        //      case FLOAT:
-        //        addFloatInput(arguments);
-        //        return;
-        //      case DOUBLE:
-        //        addDoubleInput(arguments);
-        //        return;
-        //      case STRING:
-        //        addBinaryInput(arguments);
-        //        return;
+      case TIMESTAMP:
+        addLongInput(arguments);
+        return;
+      case FLOAT:
+        addFloatInput(arguments);
+        return;
+      case DOUBLE:
+        addDoubleInput(arguments);
+        return;
+      case STRING:
       case TEXT:
       case BLOB:
+        addBinaryInput(arguments);
+        return;
       case BOOLEAN:
+        addBooleanInput(arguments);
+        return;
       default:
         throw new 
UnSupportedDataTypeException(String.format(UNSUPPORTED_TYPE_MESSAGE, 
yDataType));
     }
@@ -144,7 +146,7 @@ public abstract class TableMaxMinByBaseAccumulator 
implements TableAccumulator {
     xNull = true;
     this.xResult.reset();
     this.yExtremeValue.reset();
-    yTimeStamp = Long.MAX_VALUE;
+    // yTimeStamp = Long.MAX_VALUE;
   }
 
   @Override
@@ -169,6 +171,91 @@ public abstract class TableMaxMinByBaseAccumulator 
implements TableAccumulator {
     }
   }
 
+  private void addLongInput(Column[] column) {
+    int count = column[1].getPositionCount();
+    for (int i = 0; i < count; i++) {
+      if (!column[1].isNull(i)) {
+        updateLongResult(column[1].getLong(i), column[0], i);
+      }
+    }
+  }
+
+  private void updateLongResult(long yValue, Column xColumn, int xIndex) {
+    if (!initResult || check(yValue, yExtremeValue.getLong())) {
+      initResult = true;
+      yExtremeValue.setLong(yValue);
+      updateX(xColumn, xIndex);
+    }
+  }
+
+  private void addFloatInput(Column[] column) {
+    int count = column[1].getPositionCount();
+    for (int i = 0; i < count; i++) {
+      if (!column[1].isNull(i)) {
+        updateFloatResult(column[1].getFloat(i), column[0], i);
+      }
+    }
+  }
+
+  private void updateFloatResult(float yValue, Column xColumn, int xIndex) {
+    if (!initResult || check(yValue, yExtremeValue.getFloat())) {
+      initResult = true;
+      yExtremeValue.setFloat(yValue);
+      updateX(xColumn, xIndex);
+    }
+  }
+
+  private void addDoubleInput(Column[] column) {
+    int count = column[1].getPositionCount();
+    for (int i = 0; i < count; i++) {
+      if (!column[1].isNull(i)) {
+        updateDoubleResult(column[1].getDouble(i), column[0], i);
+      }
+    }
+  }
+
+  private void updateDoubleResult(double yValue, Column xColumn, int xIndex) {
+    if (!initResult || check(yValue, yExtremeValue.getDouble())) {
+      initResult = true;
+      yExtremeValue.setDouble(yValue);
+      updateX(xColumn, xIndex);
+    }
+  }
+
+  private void addBinaryInput(Column[] column) {
+    int count = column[1].getPositionCount();
+    for (int i = 0; i < count; i++) {
+      if (!column[1].isNull(i)) {
+        updateBinaryResult(column[1].getBinary(i), column[0], i);
+      }
+    }
+  }
+
+  private void updateBinaryResult(Binary yValue, Column xColumn, int xIndex) {
+    if (!initResult || check(yValue, yExtremeValue.getBinary())) {
+      initResult = true;
+      yExtremeValue.setBinary(yValue);
+      updateX(xColumn, xIndex);
+    }
+  }
+
+  private void addBooleanInput(Column[] column) {
+    int count = column[1].getPositionCount();
+    for (int i = 0; i < count; i++) {
+      if (!column[1].isNull(i)) {
+        updateBooleanResult(column[1].getBoolean(i), column[0], i);
+      }
+    }
+  }
+
+  private void updateBooleanResult(boolean yValue, Column xColumn, int xIndex) 
{
+    if (!initResult || check(yValue, yExtremeValue.getBoolean())) {
+      initResult = true;
+      yExtremeValue.setBoolean(yValue);
+      updateX(xColumn, xIndex);
+    }
+  }
+
   private void writeX(ColumnBuilder columnBuilder) {
     if (xNull) {
       columnBuilder.appendNull();
@@ -241,7 +328,7 @@ public abstract class TableMaxMinByBaseAccumulator 
implements TableAccumulator {
     ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
     DataOutputStream dataOutputStream = new 
DataOutputStream(byteArrayOutputStream);
     try {
-      dataOutputStream.writeLong(yTimeStamp);
+      // dataOutputStream.writeLong(yTimeStamp);
       writeIntermediateToStream(yDataType, yExtremeValue, dataOutputStream);
       dataOutputStream.writeBoolean(xNull);
       if (!xNull) {
@@ -288,8 +375,8 @@ public abstract class TableMaxMinByBaseAccumulator 
implements TableAccumulator {
   }
 
   private void updateFromBytesIntermediateInput(byte[] bytes) {
-    long time = BytesUtils.bytesToLongFromOffset(bytes, Long.BYTES, 0);
-    int offset = Long.BYTES;
+    // long time = BytesUtils.bytesToLongFromOffset(bytes, Long.BYTES, 0);
+    int offset = 0;
     // Use Column to store x value
     TsBlockBuilder builder = new 
TsBlockBuilder(Collections.singletonList(xDataType));
     ColumnBuilder columnBuilder = builder.getValueColumnBuilders()[0];
@@ -301,36 +388,41 @@ public abstract class TableMaxMinByBaseAccumulator 
implements TableAccumulator {
         readXFromBytesIntermediateInput(bytes, offset, columnBuilder);
         updateIntResult(intMaxVal, columnBuilder.build(), 0);
         break;
-        //      case INT64:
-        //      case TIMESTAMP:
-        //        long longMaxVal = BytesUtils.bytesToLongFromOffset(bytes, 
Long.BYTES, offset);
-        //        offset += Long.BYTES;
-        //        readXFromBytesIntermediateInput(bytes, offset, 
columnBuilder);
-        //        updateLongResult(time, longMaxVal, columnBuilder.build(), 0);
-        //        break;
-        //      case FLOAT:
-        //        float floatMaxVal = BytesUtils.bytesToFloat(bytes, offset);
-        //        offset += Float.BYTES;
-        //        readXFromBytesIntermediateInput(bytes, offset, 
columnBuilder);
-        //        updateFloatResult(time, floatMaxVal, columnBuilder.build(), 
0);
-        //        break;
-        //      case DOUBLE:
-        //        double doubleMaxVal = BytesUtils.bytesToDouble(bytes, 
offset);
-        //        offset += Long.BYTES;
-        //        readXFromBytesIntermediateInput(bytes, offset, 
columnBuilder);
-        //        updateDoubleResult(time, doubleMaxVal, 
columnBuilder.build(), 0);
-        //        break;
-        //      case STRING:
-        //        int length = BytesUtils.bytesToInt(bytes, offset);
-        //        offset += Integer.BYTES;
-        //        Binary binaryMaxVal = new Binary(BytesUtils.subBytes(bytes, 
offset, length));
-        //        offset += length;
-        //        readXFromBytesIntermediateInput(bytes, offset, 
columnBuilder);
-        //        updateBinaryResult(time, binaryMaxVal, 
columnBuilder.build(), 0);
-        //        break;
+      case INT64:
+      case TIMESTAMP:
+        long longMaxVal = BytesUtils.bytesToLongFromOffset(bytes, Long.BYTES, 
offset);
+        offset += Long.BYTES;
+        readXFromBytesIntermediateInput(bytes, offset, columnBuilder);
+        updateLongResult(longMaxVal, columnBuilder.build(), 0);
+        break;
+      case FLOAT:
+        float floatMaxVal = BytesUtils.bytesToFloat(bytes, offset);
+        offset += Float.BYTES;
+        readXFromBytesIntermediateInput(bytes, offset, columnBuilder);
+        updateFloatResult(floatMaxVal, columnBuilder.build(), 0);
+        break;
+      case DOUBLE:
+        double doubleMaxVal = BytesUtils.bytesToDouble(bytes, offset);
+        offset += Long.BYTES;
+        readXFromBytesIntermediateInput(bytes, offset, columnBuilder);
+        updateDoubleResult(doubleMaxVal, columnBuilder.build(), 0);
+        break;
+      case STRING:
       case TEXT:
       case BLOB:
+        int length = BytesUtils.bytesToInt(bytes, offset);
+        offset += Integer.BYTES;
+        Binary binaryMaxVal = new Binary(BytesUtils.subBytes(bytes, offset, 
length));
+        offset += length;
+        readXFromBytesIntermediateInput(bytes, offset, columnBuilder);
+        updateBinaryResult(binaryMaxVal, columnBuilder.build(), 0);
+        break;
       case BOOLEAN:
+        boolean booleanMaxVal = BytesUtils.bytesToBool(bytes, offset);
+        offset += 1;
+        readXFromBytesIntermediateInput(bytes, offset, columnBuilder);
+        updateBooleanResult(booleanMaxVal, columnBuilder.build(), 0);
+        break;
       default:
         throw new 
UnSupportedDataTypeException(String.format(UNSUPPORTED_TYPE_MESSAGE, 
yDataType));
     }
@@ -389,4 +481,6 @@ public abstract class TableMaxMinByBaseAccumulator 
implements TableAccumulator {
   protected abstract boolean check(double yValue, double yExtremeValue);
 
   protected abstract boolean check(Binary yValue, Binary yExtremeValue);
+
+  protected abstract boolean check(boolean yValue, boolean yExtremeValue);
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/TableMinByAccumulator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/TableMinByAccumulator.java
index 3b6d32195c8..d4fbadae512 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/TableMinByAccumulator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/TableMinByAccumulator.java
@@ -56,6 +56,11 @@ public class TableMinByAccumulator extends 
TableMaxMinByBaseAccumulator {
     return yValue.compareTo(yExtremeValue) < 0;
   }
 
+  @Override
+  protected boolean check(boolean yValue, boolean yExtremeValue) {
+    return !yValue;
+  }
+
   @Override
   public long getEstimatedSize() {
     return INSTANCE_SIZE;

Reply via email to