This is an automated email from the ASF dual-hosted git repository.

lancelly pushed a commit to branch max_by
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit f3453e07b8cabcdf04f8fdbc012372f99800eb6b
Author: lancelly <[email protected]>
AuthorDate: Wed Jan 17 22:49:39 2024 +0800

    MaxByAccumulator
---
 .../execution/aggregation/MaxByAccumulator.java    | 343 +++++++++++++++++++++
 1 file changed, 343 insertions(+)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/MaxByAccumulator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/MaxByAccumulator.java
new file mode 100644
index 00000000000..d393e8507cb
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/MaxByAccumulator.java
@@ -0,0 +1,343 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.execution.aggregation;
+
+import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
+import org.apache.iotdb.tsfile.read.common.block.column.Column;
+import org.apache.iotdb.tsfile.read.common.block.column.ColumnBuilder;
+import org.apache.iotdb.tsfile.utils.BitMap;
+import org.apache.iotdb.tsfile.utils.TsPrimitiveType;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+/** max(x,y) returns the value of x associated with the maximum value of y 
over all input values. */
+public class MaxByAccumulator implements Accumulator {
+
+  private final TSDataType xDataType;
+
+  private final TSDataType yDataType;
+
+  private final TsPrimitiveType yMaxValue;
+
+  private final TsPrimitiveType xResult;
+
+  private boolean xNull = true;
+
+  private boolean initResult;
+
+  private static final String UNSUPPORTED_TYPE_MESSAGE = "Unsupported data 
type in MaxBy: %s";
+
+  public MaxByAccumulator(TSDataType xDataType, TSDataType yDataType) {
+    this.xDataType = xDataType;
+    this.yDataType = yDataType;
+    this.xResult = TsPrimitiveType.getByType(xDataType);
+    this.yMaxValue = TsPrimitiveType.getByType(yDataType);
+  }
+
+  // Column should be like: | Time | x | y |
+  @Override
+  public void addInput(Column[] column, BitMap bitMap, int lastIndex) {
+    switch (yDataType) {
+      case INT32:
+        addIntInput(column, bitMap, lastIndex);
+        return;
+      case INT64:
+        addLongInput(column, bitMap, lastIndex);
+        return;
+      case FLOAT:
+        addFloatInput(column, bitMap, lastIndex);
+        return;
+      case DOUBLE:
+        addDoubleInput(column, bitMap, lastIndex);
+        return;
+      case TEXT:
+      case BOOLEAN:
+      default:
+        throw new 
UnSupportedDataTypeException(String.format(UNSUPPORTED_TYPE_MESSAGE, 
yDataType));
+    }
+  }
+
+  // partialResult should be like: | partialX | partialY |
+  @Override
+  public void addIntermediate(Column[] partialResult) {
+    checkArgument(partialResult.length == 2, "partialResult of MaxBy should be 
2");
+    // Return if y is null.
+    if (partialResult[1].isNull(0)) {
+      return;
+    }
+    switch (yDataType) {
+      case INT32:
+        updateIntResult(partialResult[1].getInt(0), partialResult[0], 0);
+        break;
+      case INT64:
+        updateLongResult(partialResult[1].getLong(0), partialResult[0], 0);
+        break;
+      case FLOAT:
+        updateFloatResult(partialResult[1].getFloat(0), partialResult[0], 0);
+        break;
+      case DOUBLE:
+        updateDoubleResult(partialResult[1].getDouble(0), partialResult[0], 0);
+        break;
+      case TEXT:
+      case BOOLEAN:
+      default:
+        throw new 
UnSupportedDataTypeException(String.format(UNSUPPORTED_TYPE_MESSAGE, 
yDataType));
+    }
+  }
+
+  @Override
+  public void addStatistics(Statistics statistics) {
+    throw new UnsupportedOperationException(getClass().getName());
+  }
+
+  // finalResult should be single column, like: | finalXValue |
+  @Override
+  public void setFinal(Column finalResult) {
+    if (finalResult.isNull(0)) {
+      return;
+    }
+    initResult = true;
+    if (finalResult.isNull(0)) {
+      xNull = true;
+      return;
+    }
+    switch (xDataType) {
+      case INT32:
+        xResult.setInt(finalResult.getInt(0));
+        break;
+      case INT64:
+        xResult.setLong(finalResult.getLong(0));
+        break;
+      case FLOAT:
+        xResult.setFloat(finalResult.getFloat(0));
+        break;
+      case DOUBLE:
+        xResult.setDouble(finalResult.getDouble(0));
+        break;
+      case TEXT:
+        xResult.setBinary(finalResult.getBinary(0));
+        break;
+      case BOOLEAN:
+        xResult.setBoolean(finalResult.getBoolean(0));
+        break;
+      default:
+        throw new 
UnSupportedDataTypeException(String.format(UNSUPPORTED_TYPE_MESSAGE, 
xDataType));
+    }
+  }
+
+  // columnBuilders should be like | xColumnBuilder | yColumnBuilder |
+  @Override
+  public void outputIntermediate(ColumnBuilder[] columnBuilders) {
+    checkArgument(columnBuilders.length == 2, "partialResult of MaxValue 
should be 2");
+    if (!initResult) {
+      columnBuilders[0].appendNull();
+      columnBuilders[1].appendNull();
+      return;
+    }
+    switch (yDataType) {
+      case INT32:
+        writeX(columnBuilders[0]);
+        columnBuilders[1].writeInt(yMaxValue.getInt());
+        break;
+      case INT64:
+        writeX(columnBuilders[0]);
+        columnBuilders[1].writeLong(yMaxValue.getLong());
+        break;
+      case FLOAT:
+        writeX(columnBuilders[0]);
+        columnBuilders[1].writeFloat(yMaxValue.getFloat());
+        break;
+      case DOUBLE:
+        writeX(columnBuilders[0]);
+        columnBuilders[1].writeDouble(yMaxValue.getDouble());
+        break;
+      case TEXT:
+      case BOOLEAN:
+      default:
+        throw new 
UnSupportedDataTypeException(String.format(UNSUPPORTED_TYPE_MESSAGE, 
yDataType));
+    }
+  }
+
+  @Override
+  public void outputFinal(ColumnBuilder columnBuilder) {
+    if (!initResult) {
+      columnBuilder.appendNull();
+      return;
+    }
+    writeX(columnBuilder);
+  }
+
+  @Override
+  public void reset() {
+    initResult = false;
+    xNull = true;
+    this.xResult.reset();
+    this.yMaxValue.reset();
+  }
+
+  @Override
+  public boolean hasFinalResult() {
+    return false;
+  }
+
+  @Override
+  public TSDataType[] getIntermediateType() {
+    return new TSDataType[] {xDataType, yDataType};
+  }
+
+  @Override
+  public TSDataType getFinalType() {
+    return xDataType;
+  }
+
+  private void addIntInput(Column[] column, BitMap bitMap, int lastIndex) {
+    for (int i = 0; i <= lastIndex; i++) {
+      if (bitMap != null && !bitMap.isMarked(i)) {
+        continue;
+      }
+      if (!column[2].isNull(i)) {
+        updateIntResult(column[2].getInt(i), column[1], i);
+      }
+    }
+  }
+
+  private void updateIntResult(int yMaxVal, Column xColumn, int xIndex) {
+    if (!initResult || yMaxVal > yMaxValue.getInt()) {
+      initResult = true;
+      updateX(xColumn, xIndex);
+    }
+  }
+
+  private void addLongInput(Column[] column, BitMap bitMap, int lastIndex) {
+    for (int i = 0; i <= lastIndex; i++) {
+      if (bitMap != null && !bitMap.isMarked(i)) {
+        continue;
+      }
+      if (!column[2].isNull(i)) {
+        updateLongResult(column[2].getLong(i), column[1], i);
+      }
+    }
+  }
+
+  private void updateLongResult(long yMaxVal, Column xColumn, int xIndex) {
+    if (!initResult || yMaxVal > yMaxValue.getLong()) {
+      initResult = true;
+      updateX(xColumn, xIndex);
+    }
+  }
+
+  private void addFloatInput(Column[] column, BitMap bitMap, int lastIndex) {
+    for (int i = 0; i <= lastIndex; i++) {
+      if (bitMap != null && !bitMap.isMarked(i)) {
+        continue;
+      }
+      if (!column[2].isNull(i)) {
+        updateFloatResult(column[2].getFloat(i), column[1], i);
+      }
+    }
+  }
+
+  private void updateFloatResult(float yMaxVal, Column xColumn, int xIndex) {
+    if (!initResult || yMaxVal > yMaxValue.getFloat()) {
+      initResult = true;
+      updateX(xColumn, xIndex);
+    }
+  }
+
+  private void addDoubleInput(Column[] column, BitMap bitMap, int lastIndex) {
+    for (int i = 0; i <= lastIndex; i++) {
+      if (bitMap != null && !bitMap.isMarked(i)) {
+        continue;
+      }
+      if (!column[2].isNull(i)) {
+        updateDoubleResult(column[2].getDouble(i), column[1], i);
+      }
+    }
+  }
+
+  private void updateDoubleResult(double yMaxVal, Column xColumn, int xIndex) {
+    if (!initResult || yMaxVal > yMaxValue.getDouble()) {
+      initResult = true;
+      updateX(xColumn, xIndex);
+    }
+  }
+
+  private void writeX(ColumnBuilder columnBuilder) {
+    if (xNull) {
+      columnBuilder.appendNull();
+      return;
+    }
+    switch (xDataType) {
+      case INT32:
+        columnBuilder.writeInt(xResult.getInt());
+        break;
+      case INT64:
+        columnBuilder.writeLong(xResult.getLong());
+        break;
+      case FLOAT:
+        columnBuilder.writeFloat(xResult.getFloat());
+        break;
+      case DOUBLE:
+        columnBuilder.writeDouble(xResult.getDouble());
+        break;
+      case TEXT:
+        columnBuilder.writeBinary(xResult.getBinary());
+        break;
+      case BOOLEAN:
+        columnBuilder.writeBoolean(xResult.getBoolean());
+        break;
+      default:
+        throw new 
UnSupportedDataTypeException(String.format(UNSUPPORTED_TYPE_MESSAGE, 
xDataType));
+    }
+  }
+
+  private void updateX(Column xColumn, int xIndex) {
+    if (xColumn.isNull(xIndex)) {
+      xNull = true;
+    } else {
+      xNull = false;
+      switch (xDataType) {
+        case INT32:
+          xResult.setInt(xColumn.getInt(xIndex));
+          break;
+        case INT64:
+          xResult.setLong(xColumn.getLong(xIndex));
+          break;
+        case FLOAT:
+          xResult.setFloat(xColumn.getFloat(xIndex));
+          break;
+        case DOUBLE:
+          xResult.setDouble(xColumn.getDouble(xIndex));
+          break;
+        case TEXT:
+          xResult.setBinary(xColumn.getBinary(xIndex));
+          break;
+        case BOOLEAN:
+          xResult.setBoolean(xColumn.getBoolean(xIndex));
+          break;
+        default:
+          throw new UnSupportedDataTypeException(
+              String.format(UNSUPPORTED_TYPE_MESSAGE, xDataType));
+      }
+    }
+  }
+}

Reply via email to