This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new e3de1b7964a Add max_by, min_by aggregation support
e3de1b7964a is described below

commit e3de1b7964af8f7dd23fe3060a6f5917d54db781
Author: Beyyes <[email protected]>
AuthorDate: Sat Oct 19 09:07:21 2024 +0800

    Add max_by, min_by aggregation support
---
 .../db/it/IoTDBMultiIDsWithAttributesTableIT.java  |  12 +
 .../relational/aggregation/AccumulatorFactory.java |   4 +
 .../relational/aggregation/LastAccumulator.java    |   3 +-
 .../aggregation/TableMaxByAccumulator.java         |  73 ++++
 .../aggregation/TableMaxMinByBaseAccumulator.java  | 486 +++++++++++++++++++++
 .../aggregation/TableMinByAccumulator.java         |  73 ++++
 6 files changed, 650 insertions(+), 1 deletion(-)

diff --git 
a/integration-test/src/test/java/org/apache/iotdb/relational/it/db/it/IoTDBMultiIDsWithAttributesTableIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/relational/it/db/it/IoTDBMultiIDsWithAttributesTableIT.java
index d7de73b7283..01d62a5c348 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/relational/it/db/it/IoTDBMultiIDsWithAttributesTableIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/relational/it/db/it/IoTDBMultiIDsWithAttributesTableIT.java
@@ -1050,6 +1050,18 @@ public class IoTDBMultiIDsWithAttributesTableIT {
     tableResultSetEqualTest(sql, expectedHeader1, retArray, DATABASE_NAME);
   }
 
+  @Test
+  public void maxByMinByTest() {
+    String[] expectedHeader1 = buildHeaders(10);
+    sql =
+        "select 
max_by(time,floatnum),min_by(time,floatnum),max_by(time,date),min_by(time,date),max_by(time,floatnum),min_by(time,floatnum),max_by(time,ts),min_by(time,ts),max_by(time,stringv),min_by(time,stringv)
 from table0";
+    retArray =
+        new String[] {
+          
"1970-01-01T00:00:00.100Z,1970-01-01T00:00:00.040Z,1971-08-20T11:33:20.000Z,1971-08-20T11:33:20.000Z,1970-01-01T00:00:00.100Z,1970-01-01T00:00:00.040Z,1971-01-01T00:00:10.000Z,1971-01-01T00:01:40.000Z,1971-01-01T00:01:40.000Z,1971-01-01T00:00:01.000Z,",
+        };
+    tableResultSetEqualTest(sql, expectedHeader1, retArray, DATABASE_NAME);
+  }
+
   // ==================================================================
   // ============================ Join Test ===========================
   // ==================================================================
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/AccumulatorFactory.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/AccumulatorFactory.java
index fc672796b04..55a2c1e28a5 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/AccumulatorFactory.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/AccumulatorFactory.java
@@ -146,6 +146,10 @@ public class AccumulatorFactory {
         return new LastByAccumulator(inputDataTypes.get(0), 
inputDataTypes.get(1), false, false);
       case FIRST_BY:
         return new FirstByAccumulator(inputDataTypes.get(0), 
inputDataTypes.get(1), false, false);
+      case MAX_BY:
+        return new TableMaxByAccumulator(inputDataTypes.get(0), 
inputDataTypes.get(1));
+      case MIN_BY:
+        return new TableMinByAccumulator(inputDataTypes.get(0), 
inputDataTypes.get(1));
       default:
         throw new IllegalArgumentException("Invalid Aggregation function: " + 
aggregationType);
     }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/LastAccumulator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/LastAccumulator.java
index 0a359de585a..df94faae8fd 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/LastAccumulator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/LastAccumulator.java
@@ -151,7 +151,8 @@ public class LastAccumulator implements TableAccumulator {
   public void evaluateIntermediate(ColumnBuilder columnBuilder) {
     checkArgument(
         columnBuilder instanceof BinaryColumnBuilder,
-        "intermediate input and output of Avg should be BinaryColumn");
+        "intermediate input and output of Last should be BinaryColumn");
+
     if (!initResult) {
       columnBuilder.appendNull();
     } else {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/TableMaxByAccumulator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/TableMaxByAccumulator.java
new file mode 100644
index 00000000000..ef0ebbb2b79
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/TableMaxByAccumulator.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package 
org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation;
+
+import org.apache.tsfile.enums.TSDataType;
+import org.apache.tsfile.utils.Binary;
+import org.apache.tsfile.utils.RamUsageEstimator;
+
+public class TableMaxByAccumulator extends TableMaxMinByBaseAccumulator {
+  private static final long INSTANCE_SIZE =
+      RamUsageEstimator.shallowSizeOfInstance(TableMaxByAccumulator.class);
+
+  protected TableMaxByAccumulator(TSDataType xDataType, TSDataType yDataType) {
+    super(xDataType, yDataType);
+  }
+
+  @Override
+  protected boolean check(int yValue, int yExtremeValue) {
+    return yValue > yExtremeValue;
+  }
+
+  @Override
+  protected boolean check(long yValue, long yExtremeValue) {
+    return yValue > yExtremeValue;
+  }
+
+  @Override
+  protected boolean check(float yValue, float yExtremeValue) {
+    return yValue > yExtremeValue;
+  }
+
+  @Override
+  protected boolean check(double yValue, double yExtremeValue) {
+    return yValue > yExtremeValue;
+  }
+
+  @Override
+  protected boolean check(Binary yValue, Binary yExtremeValue) {
+    return yValue.compareTo(yExtremeValue) > 0;
+  }
+
+  @Override
+  protected boolean check(boolean yValue, boolean yExtremeValue) {
+    return yValue;
+  }
+
+  @Override
+  public long getEstimatedSize() {
+    return INSTANCE_SIZE;
+  }
+
+  @Override
+  public TableAccumulator copy() {
+    return new TableMaxByAccumulator(this.xDataType, this.yDataType);
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/TableMaxMinByBaseAccumulator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/TableMaxMinByBaseAccumulator.java
new file mode 100644
index 00000000000..2b883e3c0ee
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/TableMaxMinByBaseAccumulator.java
@@ -0,0 +1,486 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package 
org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation;
+
+import org.apache.tsfile.block.column.Column;
+import org.apache.tsfile.block.column.ColumnBuilder;
+import org.apache.tsfile.enums.TSDataType;
+import org.apache.tsfile.file.metadata.statistics.Statistics;
+import org.apache.tsfile.read.common.block.TsBlockBuilder;
+import org.apache.tsfile.read.common.block.column.BinaryColumn;
+import org.apache.tsfile.read.common.block.column.BinaryColumnBuilder;
+import org.apache.tsfile.read.common.block.column.RunLengthEncodedColumn;
+import org.apache.tsfile.utils.Binary;
+import org.apache.tsfile.utils.BytesUtils;
+import org.apache.tsfile.utils.TsPrimitiveType;
+import org.apache.tsfile.write.UnSupportedDataTypeException;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.Collections;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+/** max(x,y) returns the value of x associated with the maximum value of y 
over all input values. */
+public abstract class TableMaxMinByBaseAccumulator implements TableAccumulator 
{
+
+  protected final TSDataType xDataType;
+
+  protected final TSDataType yDataType;
+
+  private final TsPrimitiveType yExtremeValue;
+
+  private final TsPrimitiveType xResult;
+
+  private boolean xNull = true;
+
+  private boolean initResult;
+
+  // private long yTimeStamp = Long.MAX_VALUE;
+
+  private static final String UNSUPPORTED_TYPE_MESSAGE = "Unsupported data 
type in MaxBy/MinBy: %s";
+
+  protected TableMaxMinByBaseAccumulator(TSDataType xDataType, TSDataType 
yDataType) {
+    this.xDataType = xDataType;
+    this.yDataType = yDataType;
+    this.xResult = TsPrimitiveType.getByType(xDataType);
+    this.yExtremeValue = TsPrimitiveType.getByType(yDataType);
+  }
+
+  // Column should be like: | x | y |
+  @Override
+  public void addInput(Column[] arguments) {
+    checkArgument(arguments.length == 2, "Length of input Column[] for 
MaxBy/MinBy should be 2");
+    switch (yDataType) {
+      case INT32:
+      case DATE:
+        addIntInput(arguments);
+        return;
+      case INT64:
+      case TIMESTAMP:
+        addLongInput(arguments);
+        return;
+      case FLOAT:
+        addFloatInput(arguments);
+        return;
+      case DOUBLE:
+        addDoubleInput(arguments);
+        return;
+      case STRING:
+      case TEXT:
+      case BLOB:
+        addBinaryInput(arguments);
+        return;
+      case BOOLEAN:
+        addBooleanInput(arguments);
+        return;
+      default:
+        throw new 
UnSupportedDataTypeException(String.format(UNSUPPORTED_TYPE_MESSAGE, 
yDataType));
+    }
+  }
+
+  @Override
+  public void addIntermediate(Column argument) {
+    checkArgument(
+        argument instanceof BinaryColumn || argument instanceof 
RunLengthEncodedColumn,
+        "intermediate input and output of max_by/min_by should be 
BinaryColumn");
+
+    for (int i = 0; i < argument.getPositionCount(); i++) {
+      if (argument.isNull(i)) {
+        continue;
+      }
+
+      byte[] bytes = argument.getBinary(i).getValues();
+      updateFromBytesIntermediateInput(bytes);
+    }
+  }
+
+  @Override
+  public void addStatistics(Statistics[] statistics) {
+    throw new UnsupportedOperationException(getClass().getName());
+  }
+
+  @Override
+  public void evaluateIntermediate(ColumnBuilder columnBuilder) {
+    checkArgument(
+        columnBuilder instanceof BinaryColumnBuilder,
+        "intermediate input and output of Max_By/Min_By should be 
BinaryColumn");
+
+    if (!initResult) {
+      columnBuilder.appendNull();
+      return;
+    }
+    columnBuilder.writeBinary(new Binary(serialize()));
+  }
+
+  @Override
+  public void evaluateFinal(ColumnBuilder columnBuilder) {
+    if (!initResult) {
+      columnBuilder.appendNull();
+      return;
+    }
+    writeX(columnBuilder);
+  }
+
+  @Override
+  public void reset() {
+    initResult = false;
+    xNull = true;
+    this.xResult.reset();
+    this.yExtremeValue.reset();
+    // yTimeStamp = Long.MAX_VALUE;
+  }
+
+  @Override
+  public boolean hasFinalResult() {
+    return false;
+  }
+
+  private void addIntInput(Column[] column) {
+    int count = column[1].getPositionCount();
+    for (int i = 0; i < count; i++) {
+      if (!column[1].isNull(i)) {
+        updateIntResult(column[1].getInt(i), column[0], i);
+      }
+    }
+  }
+
+  private void updateIntResult(int yValue, Column xColumn, int xIndex) {
+    if (!initResult || check(yValue, yExtremeValue.getInt())) {
+      initResult = true;
+      yExtremeValue.setInt(yValue);
+      updateX(xColumn, xIndex);
+    }
+  }
+
+  private void addLongInput(Column[] column) {
+    int count = column[1].getPositionCount();
+    for (int i = 0; i < count; i++) {
+      if (!column[1].isNull(i)) {
+        updateLongResult(column[1].getLong(i), column[0], i);
+      }
+    }
+  }
+
+  private void updateLongResult(long yValue, Column xColumn, int xIndex) {
+    if (!initResult || check(yValue, yExtremeValue.getLong())) {
+      initResult = true;
+      yExtremeValue.setLong(yValue);
+      updateX(xColumn, xIndex);
+    }
+  }
+
+  private void addFloatInput(Column[] column) {
+    int count = column[1].getPositionCount();
+    for (int i = 0; i < count; i++) {
+      if (!column[1].isNull(i)) {
+        updateFloatResult(column[1].getFloat(i), column[0], i);
+      }
+    }
+  }
+
+  private void updateFloatResult(float yValue, Column xColumn, int xIndex) {
+    if (!initResult || check(yValue, yExtremeValue.getFloat())) {
+      initResult = true;
+      yExtremeValue.setFloat(yValue);
+      updateX(xColumn, xIndex);
+    }
+  }
+
+  private void addDoubleInput(Column[] column) {
+    int count = column[1].getPositionCount();
+    for (int i = 0; i < count; i++) {
+      if (!column[1].isNull(i)) {
+        updateDoubleResult(column[1].getDouble(i), column[0], i);
+      }
+    }
+  }
+
+  private void updateDoubleResult(double yValue, Column xColumn, int xIndex) {
+    if (!initResult || check(yValue, yExtremeValue.getDouble())) {
+      initResult = true;
+      yExtremeValue.setDouble(yValue);
+      updateX(xColumn, xIndex);
+    }
+  }
+
+  private void addBinaryInput(Column[] column) {
+    int count = column[1].getPositionCount();
+    for (int i = 0; i < count; i++) {
+      if (!column[1].isNull(i)) {
+        updateBinaryResult(column[1].getBinary(i), column[0], i);
+      }
+    }
+  }
+
+  private void updateBinaryResult(Binary yValue, Column xColumn, int xIndex) {
+    if (!initResult || check(yValue, yExtremeValue.getBinary())) {
+      initResult = true;
+      yExtremeValue.setBinary(yValue);
+      updateX(xColumn, xIndex);
+    }
+  }
+
+  private void addBooleanInput(Column[] column) {
+    int count = column[1].getPositionCount();
+    for (int i = 0; i < count; i++) {
+      if (!column[1].isNull(i)) {
+        updateBooleanResult(column[1].getBoolean(i), column[0], i);
+      }
+    }
+  }
+
+  private void updateBooleanResult(boolean yValue, Column xColumn, int xIndex) 
{
+    if (!initResult || check(yValue, yExtremeValue.getBoolean())) {
+      initResult = true;
+      yExtremeValue.setBoolean(yValue);
+      updateX(xColumn, xIndex);
+    }
+  }
+
+  private void writeX(ColumnBuilder columnBuilder) {
+    if (xNull) {
+      columnBuilder.appendNull();
+      return;
+    }
+    switch (xDataType) {
+      case INT32:
+      case DATE:
+        columnBuilder.writeInt(xResult.getInt());
+        break;
+      case INT64:
+      case TIMESTAMP:
+        columnBuilder.writeLong(xResult.getLong());
+        break;
+      case FLOAT:
+        columnBuilder.writeFloat(xResult.getFloat());
+        break;
+      case DOUBLE:
+        columnBuilder.writeDouble(xResult.getDouble());
+        break;
+      case TEXT:
+      case STRING:
+      case BLOB:
+        columnBuilder.writeBinary(xResult.getBinary());
+        break;
+      case BOOLEAN:
+        columnBuilder.writeBoolean(xResult.getBoolean());
+        break;
+      default:
+        throw new 
UnSupportedDataTypeException(String.format(UNSUPPORTED_TYPE_MESSAGE, 
xDataType));
+    }
+  }
+
+  private void updateX(Column xColumn, int xIndex) {
+    if (xColumn.isNull(xIndex)) {
+      xNull = true;
+    } else {
+      xNull = false;
+      switch (xDataType) {
+        case INT32:
+        case DATE:
+          xResult.setInt(xColumn.getInt(xIndex));
+          break;
+        case INT64:
+        case TIMESTAMP:
+          xResult.setLong(xColumn.getLong(xIndex));
+          break;
+        case FLOAT:
+          xResult.setFloat(xColumn.getFloat(xIndex));
+          break;
+        case DOUBLE:
+          xResult.setDouble(xColumn.getDouble(xIndex));
+          break;
+        case TEXT:
+        case STRING:
+        case BLOB:
+          xResult.setBinary(xColumn.getBinary(xIndex));
+          break;
+        case BOOLEAN:
+          xResult.setBoolean(xColumn.getBoolean(xIndex));
+          break;
+        default:
+          throw new UnSupportedDataTypeException(
+              String.format(UNSUPPORTED_TYPE_MESSAGE, xDataType));
+      }
+    }
+  }
+
+  private byte[] serialize() {
+    ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
+    DataOutputStream dataOutputStream = new 
DataOutputStream(byteArrayOutputStream);
+    try {
+      // dataOutputStream.writeLong(yTimeStamp);
+      writeIntermediateToStream(yDataType, yExtremeValue, dataOutputStream);
+      dataOutputStream.writeBoolean(xNull);
+      if (!xNull) {
+        writeIntermediateToStream(xDataType, xResult, dataOutputStream);
+      }
+    } catch (IOException e) {
+      throw new UnsupportedOperationException(
+          "Failed to serialize intermediate result for MaxByAccumulator.", e);
+    }
+    return byteArrayOutputStream.toByteArray();
+  }
+
+  private void writeIntermediateToStream(
+      TSDataType dataType, TsPrimitiveType value, DataOutputStream 
dataOutputStream)
+      throws IOException {
+    switch (dataType) {
+      case INT32:
+      case DATE:
+        dataOutputStream.writeInt(value.getInt());
+        break;
+      case INT64:
+      case TIMESTAMP:
+        dataOutputStream.writeLong(value.getLong());
+        break;
+      case FLOAT:
+        dataOutputStream.writeFloat(value.getFloat());
+        break;
+      case DOUBLE:
+        dataOutputStream.writeDouble(value.getDouble());
+        break;
+      case TEXT:
+      case STRING:
+      case BLOB:
+        String content = value.getBinary().toString();
+        dataOutputStream.writeInt(content.length());
+        dataOutputStream.writeBytes(content);
+        break;
+      case BOOLEAN:
+        dataOutputStream.writeBoolean(value.getBoolean());
+        break;
+      default:
+        throw new 
UnSupportedDataTypeException(String.format(UNSUPPORTED_TYPE_MESSAGE, dataType));
+    }
+  }
+
+  private void updateFromBytesIntermediateInput(byte[] bytes) {
+    // long time = BytesUtils.bytesToLongFromOffset(bytes, Long.BYTES, 0);
+    int offset = 0;
+    // Use Column to store x value
+    TsBlockBuilder builder = new 
TsBlockBuilder(Collections.singletonList(xDataType));
+    ColumnBuilder columnBuilder = builder.getValueColumnBuilders()[0];
+    switch (yDataType) {
+      case INT32:
+      case DATE:
+        int intMaxVal = BytesUtils.bytesToInt(bytes, offset);
+        offset += Integer.BYTES;
+        readXFromBytesIntermediateInput(bytes, offset, columnBuilder);
+        updateIntResult(intMaxVal, columnBuilder.build(), 0);
+        break;
+      case INT64:
+      case TIMESTAMP:
+        long longMaxVal = BytesUtils.bytesToLongFromOffset(bytes, Long.BYTES, 
offset);
+        offset += Long.BYTES;
+        readXFromBytesIntermediateInput(bytes, offset, columnBuilder);
+        updateLongResult(longMaxVal, columnBuilder.build(), 0);
+        break;
+      case FLOAT:
+        float floatMaxVal = BytesUtils.bytesToFloat(bytes, offset);
+        offset += Float.BYTES;
+        readXFromBytesIntermediateInput(bytes, offset, columnBuilder);
+        updateFloatResult(floatMaxVal, columnBuilder.build(), 0);
+        break;
+      case DOUBLE:
+        double doubleMaxVal = BytesUtils.bytesToDouble(bytes, offset);
+        offset += Long.BYTES;
+        readXFromBytesIntermediateInput(bytes, offset, columnBuilder);
+        updateDoubleResult(doubleMaxVal, columnBuilder.build(), 0);
+        break;
+      case STRING:
+      case TEXT:
+      case BLOB:
+        int length = BytesUtils.bytesToInt(bytes, offset);
+        offset += Integer.BYTES;
+        Binary binaryMaxVal = new Binary(BytesUtils.subBytes(bytes, offset, 
length));
+        offset += length;
+        readXFromBytesIntermediateInput(bytes, offset, columnBuilder);
+        updateBinaryResult(binaryMaxVal, columnBuilder.build(), 0);
+        break;
+      case BOOLEAN:
+        boolean booleanMaxVal = BytesUtils.bytesToBool(bytes, offset);
+        offset += 1;
+        readXFromBytesIntermediateInput(bytes, offset, columnBuilder);
+        updateBooleanResult(booleanMaxVal, columnBuilder.build(), 0);
+        break;
+      default:
+        throw new 
UnSupportedDataTypeException(String.format(UNSUPPORTED_TYPE_MESSAGE, 
yDataType));
+    }
+  }
+
+  private void readXFromBytesIntermediateInput(
+      byte[] bytes, int offset, ColumnBuilder columnBuilder) {
+    boolean isXNull = BytesUtils.bytesToBool(bytes, offset);
+    offset += 1;
+    if (isXNull) {
+      columnBuilder.appendNull();
+    } else {
+      switch (xDataType) {
+        case INT32:
+        case DATE:
+          columnBuilder.writeInt(BytesUtils.bytesToInt(bytes, offset));
+          break;
+        case INT64:
+        case TIMESTAMP:
+          columnBuilder.writeLong(BytesUtils.bytesToLongFromOffset(bytes, 8, 
offset));
+          break;
+        case FLOAT:
+          columnBuilder.writeFloat(BytesUtils.bytesToFloat(bytes, offset));
+          break;
+        case DOUBLE:
+          columnBuilder.writeDouble(BytesUtils.bytesToDouble(bytes, offset));
+          break;
+        case TEXT:
+        case STRING:
+        case BLOB:
+          int length = BytesUtils.bytesToInt(bytes, offset);
+          offset += Integer.BYTES;
+          columnBuilder.writeBinary(new Binary(BytesUtils.subBytes(bytes, 
offset, length)));
+          break;
+        case BOOLEAN:
+          columnBuilder.writeBoolean(BytesUtils.bytesToBool(bytes, offset));
+          break;
+        default:
+          throw new UnSupportedDataTypeException(
+              String.format(UNSUPPORTED_TYPE_MESSAGE, xDataType));
+      }
+    }
+  }
+
+  /**
+   * @param yValue Input y.
+   * @param yExtremeValue Current extreme value of y.
+   * @return True if yValue is the new extreme value.
+   */
+  protected abstract boolean check(int yValue, int yExtremeValue);
+
+  protected abstract boolean check(long yValue, long yExtremeValue);
+
+  protected abstract boolean check(float yValue, float yExtremeValue);
+
+  protected abstract boolean check(double yValue, double yExtremeValue);
+
+  protected abstract boolean check(Binary yValue, Binary yExtremeValue);
+
+  protected abstract boolean check(boolean yValue, boolean yExtremeValue);
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/TableMinByAccumulator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/TableMinByAccumulator.java
new file mode 100644
index 00000000000..d4fbadae512
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/TableMinByAccumulator.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package 
org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation;
+
+import org.apache.tsfile.enums.TSDataType;
+import org.apache.tsfile.utils.Binary;
+import org.apache.tsfile.utils.RamUsageEstimator;
+
+public class TableMinByAccumulator extends TableMaxMinByBaseAccumulator {
+  private static final long INSTANCE_SIZE =
+      RamUsageEstimator.shallowSizeOfInstance(TableMinByAccumulator.class);
+
+  protected TableMinByAccumulator(TSDataType xDataType, TSDataType yDataType) {
+    super(xDataType, yDataType);
+  }
+
+  @Override
+  protected boolean check(int yValue, int yExtremeValue) {
+    return yValue < yExtremeValue;
+  }
+
+  @Override
+  protected boolean check(long yValue, long yExtremeValue) {
+    return yValue < yExtremeValue;
+  }
+
+  @Override
+  protected boolean check(float yValue, float yExtremeValue) {
+    return yValue < yExtremeValue;
+  }
+
+  @Override
+  protected boolean check(double yValue, double yExtremeValue) {
+    return yValue < yExtremeValue;
+  }
+
+  @Override
+  protected boolean check(Binary yValue, Binary yExtremeValue) {
+    return yValue.compareTo(yExtremeValue) < 0;
+  }
+
+  @Override
+  protected boolean check(boolean yValue, boolean yExtremeValue) {
+    return !yValue;
+  }
+
+  @Override
+  public long getEstimatedSize() {
+    return INSTANCE_SIZE;
+  }
+
+  @Override
+  public TableAccumulator copy() {
+    return new TableMinByAccumulator(xDataType, yDataType);
+  }
+}

Reply via email to