This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new a6cd2b07637 Add extreme aggregation for table model
a6cd2b07637 is described below
commit a6cd2b076378120564d7b505749768f850653a78
Author: Beyyes <[email protected]>
AuthorDate: Sat Oct 19 16:16:48 2024 +0800
Add extreme aggregation for table model
---
.../db/it/IoTDBMultiIDsWithAttributesTableIT.java | 11 +-
.../relational/aggregation/AccumulatorFactory.java | 2 +
.../relational/aggregation/ExtremeAccumulator.java | 306 +++++++++++++++++++++
3 files changed, 316 insertions(+), 3 deletions(-)
diff --git
a/integration-test/src/test/java/org/apache/iotdb/relational/it/db/it/IoTDBMultiIDsWithAttributesTableIT.java
b/integration-test/src/test/java/org/apache/iotdb/relational/it/db/it/IoTDBMultiIDsWithAttributesTableIT.java
index 01d62a5c348..568457e8c3d 100644
---
a/integration-test/src/test/java/org/apache/iotdb/relational/it/db/it/IoTDBMultiIDsWithAttributesTableIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/relational/it/db/it/IoTDBMultiIDsWithAttributesTableIT.java
@@ -1051,15 +1051,20 @@ public class IoTDBMultiIDsWithAttributesTableIT {
}
@Test
- public void maxByMinByTest() {
- String[] expectedHeader1 = buildHeaders(10);
+ public void maxByMinByExtremeTest() {
+ expectedHeader = buildHeaders(10);
sql =
"select
max_by(time,floatnum),min_by(time,floatnum),max_by(time,date),min_by(time,date),max_by(time,floatnum),min_by(time,floatnum),max_by(time,ts),min_by(time,ts),max_by(time,stringv),min_by(time,stringv)
from table0";
retArray =
new String[] {
"1970-01-01T00:00:00.100Z,1970-01-01T00:00:00.040Z,1971-08-20T11:33:20.000Z,1971-08-20T11:33:20.000Z,1970-01-01T00:00:00.100Z,1970-01-01T00:00:00.040Z,1971-01-01T00:00:10.000Z,1971-01-01T00:01:40.000Z,1971-01-01T00:01:40.000Z,1971-01-01T00:00:01.000Z,",
};
- tableResultSetEqualTest(sql, expectedHeader1, retArray, DATABASE_NAME);
+ tableResultSetEqualTest(sql, expectedHeader, retArray, DATABASE_NAME);
+
+ expectedHeader = buildHeaders(3);
+ sql = "select extreme(num),extreme(bignum),extreme(floatnum) from table0";
+ retArray = new String[] {"15,3147483648,4654.231,"};
+ tableResultSetEqualTest(sql, expectedHeader, retArray, DATABASE_NAME);
}
// ==================================================================
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/AccumulatorFactory.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/AccumulatorFactory.java
index 55a2c1e28a5..0dc87039ecb 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/AccumulatorFactory.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/AccumulatorFactory.java
@@ -150,6 +150,8 @@ public class AccumulatorFactory {
return new TableMaxByAccumulator(inputDataTypes.get(0),
inputDataTypes.get(1));
case MIN_BY:
return new TableMinByAccumulator(inputDataTypes.get(0),
inputDataTypes.get(1));
+ case EXTREME:
+ return new ExtremeAccumulator(inputDataTypes.get(0));
default:
throw new IllegalArgumentException("Invalid Aggregation function: " +
aggregationType);
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/ExtremeAccumulator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/ExtremeAccumulator.java
new file mode 100644
index 00000000000..cdcd08295b6
--- /dev/null
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/ExtremeAccumulator.java
@@ -0,0 +1,306 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package
org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation;
+
+import org.apache.tsfile.block.column.Column;
+import org.apache.tsfile.block.column.ColumnBuilder;
+import org.apache.tsfile.enums.TSDataType;
+import org.apache.tsfile.file.metadata.statistics.Statistics;
+import org.apache.tsfile.utils.RamUsageEstimator;
+import org.apache.tsfile.utils.TsPrimitiveType;
+import org.apache.tsfile.write.UnSupportedDataTypeException;
+
+public class ExtremeAccumulator implements TableAccumulator {
+ private static final long INSTANCE_SIZE =
+ RamUsageEstimator.shallowSizeOfInstance(ExtremeAccumulator.class);
+ private static final String UNSUPPORTED_DATA_TYPE = "Unsupported data type
in Extreme: %s";
+ private final TSDataType seriesDataType;
+ private final TsPrimitiveType extremeResult;
+ private boolean initResult;
+
+ public ExtremeAccumulator(TSDataType seriesDataType) {
+ this.seriesDataType = seriesDataType;
+ this.extremeResult = TsPrimitiveType.getByType(seriesDataType);
+ }
+
+ @Override
+ public long getEstimatedSize() {
+ return INSTANCE_SIZE;
+ }
+
+ @Override
+ public TableAccumulator copy() {
+ return new ExtremeAccumulator(seriesDataType);
+ }
+
+ @Override
+ public void addInput(Column[] arguments) {
+ switch (seriesDataType) {
+ case INT32:
+ addIntInput(arguments[0]);
+ return;
+ case INT64:
+ addLongInput(arguments[0]);
+ return;
+ case FLOAT:
+ addFloatInput(arguments[0]);
+ return;
+ case DOUBLE:
+ addDoubleInput(arguments[0]);
+ return;
+ case TEXT:
+ case STRING:
+ case BLOB:
+ case BOOLEAN:
+ case DATE:
+ case TIMESTAMP:
+ default:
+ throw new UnSupportedDataTypeException(
+ String.format(UNSUPPORTED_DATA_TYPE, seriesDataType));
+ }
+ }
+
+ @Override
+ public void addIntermediate(Column argument) {
+ for (int i = 0; i < argument.getPositionCount(); i++) {
+ if (argument.isNull(i)) {
+ continue;
+ }
+
+ switch (seriesDataType) {
+ case INT32:
+ updateIntResult(argument.getInt(i));
+ break;
+ case INT64:
+ updateLongResult(argument.getLong(i));
+ break;
+ case FLOAT:
+ updateFloatResult(argument.getFloat(i));
+ break;
+ case DOUBLE:
+ updateDoubleResult(argument.getDouble(i));
+ break;
+ case TEXT:
+ case STRING:
+ case BLOB:
+ case BOOLEAN:
+ case DATE:
+ case TIMESTAMP:
+ default:
+ throw new UnSupportedDataTypeException(
+ String.format(UNSUPPORTED_DATA_TYPE, seriesDataType));
+ }
+ }
+ }
+
+ @Override
+ public void addStatistics(Statistics[] statistics) {
+ if (statistics == null || statistics[0] == null) {
+ return;
+ }
+
+ switch (seriesDataType) {
+ case INT32:
+ updateIntResult((int) statistics[0].getMaxValue());
+ updateIntResult((int) statistics[0].getMinValue());
+ break;
+ case INT64:
+ updateLongResult((long) statistics[0].getMaxValue());
+ updateLongResult((long) statistics[0].getMinValue());
+ break;
+ case FLOAT:
+ updateFloatResult((float) statistics[0].getMaxValue());
+ updateFloatResult((float) statistics[0].getMinValue());
+ break;
+ case DOUBLE:
+ updateDoubleResult((double) statistics[0].getMaxValue());
+ updateDoubleResult((double) statistics[0].getMinValue());
+ break;
+ case TEXT:
+ case STRING:
+ case BLOB:
+ case BOOLEAN:
+ case DATE:
+ case TIMESTAMP:
+ default:
+ throw new UnSupportedDataTypeException(
+ String.format(UNSUPPORTED_DATA_TYPE, seriesDataType));
+ }
+ }
+
+ @Override
+ public void evaluateIntermediate(ColumnBuilder columnBuilder) {
+ if (!initResult) {
+ columnBuilder.appendNull();
+ return;
+ }
+
+ switch (seriesDataType) {
+ case INT32:
+ columnBuilder.writeInt(extremeResult.getInt());
+ break;
+ case INT64:
+ columnBuilder.writeLong(extremeResult.getLong());
+ break;
+ case FLOAT:
+ columnBuilder.writeFloat(extremeResult.getFloat());
+ break;
+ case DOUBLE:
+ columnBuilder.writeDouble(extremeResult.getDouble());
+ break;
+ case TEXT:
+ case STRING:
+ case BLOB:
+ case BOOLEAN:
+ case DATE:
+ case TIMESTAMP:
+ default:
+ throw new UnSupportedDataTypeException(
+ String.format(UNSUPPORTED_DATA_TYPE, seriesDataType));
+ }
+ }
+
+ @Override
+ public void evaluateFinal(ColumnBuilder columnBuilder) {
+ if (!initResult) {
+ columnBuilder.appendNull();
+ return;
+ }
+
+ switch (seriesDataType) {
+ case INT32:
+ columnBuilder.writeInt(extremeResult.getInt());
+ break;
+ case INT64:
+ columnBuilder.writeLong(extremeResult.getLong());
+ break;
+ case FLOAT:
+ columnBuilder.writeFloat(extremeResult.getFloat());
+ break;
+ case DOUBLE:
+ columnBuilder.writeDouble(extremeResult.getDouble());
+ break;
+ case TEXT:
+ case STRING:
+ case BLOB:
+ case BOOLEAN:
+ case DATE:
+ case TIMESTAMP:
+ default:
+ throw new UnSupportedDataTypeException(
+ String.format(UNSUPPORTED_DATA_TYPE, seriesDataType));
+ }
+ }
+
+ @Override
+ public void reset() {
+ initResult = false;
+ extremeResult.reset();
+ }
+
+ @Override
+ public boolean hasFinalResult() {
+ return false;
+ }
+
+ private void addIntInput(Column column) {
+ for (int i = 0; i < column.getPositionCount(); i++) {
+ if (!column.isNull(i)) {
+ updateIntResult(column.getInt(i));
+ }
+ }
+ }
+
+ private void updateIntResult(int val) {
+ int absExtVal = Math.abs(val);
+ int candidateResult = extremeResult.getInt();
+ int absCandidateResult = Math.abs(extremeResult.getInt());
+
+ if (!initResult
+ || (absExtVal > absCandidateResult)
+ || (absExtVal == absCandidateResult) && val > candidateResult) {
+ initResult = true;
+ extremeResult.setInt(val);
+ }
+ }
+
+ private void addLongInput(Column column) {
+ for (int i = 0; i < column.getPositionCount(); i++) {
+ if (!column.isNull(i)) {
+ updateLongResult(column.getLong(i));
+ }
+ }
+ }
+
+ private void updateLongResult(long val) {
+ long absExtVal = Math.abs(val);
+ long candidateResult = extremeResult.getLong();
+ long absCandidateResult = Math.abs(extremeResult.getLong());
+
+ if (!initResult
+ || (absExtVal > absCandidateResult)
+ || (absExtVal == absCandidateResult) && val > candidateResult) {
+ initResult = true;
+ extremeResult.setLong(val);
+ }
+ }
+
+ private void addFloatInput(Column column) {
+ for (int i = 0; i < column.getPositionCount(); i++) {
+ if (!column.isNull(i)) {
+ updateFloatResult(column.getFloat(i));
+ }
+ }
+ }
+
+ private void updateFloatResult(float val) {
+ float absExtVal = Math.abs(val);
+ float candidateResult = extremeResult.getFloat();
+ float absCandidateResult = Math.abs(extremeResult.getFloat());
+
+ if (!initResult
+ || (absExtVal > absCandidateResult)
+ || (absExtVal == absCandidateResult) && val > candidateResult) {
+ initResult = true;
+ extremeResult.setFloat(val);
+ }
+ }
+
+ private void addDoubleInput(Column column) {
+ for (int i = 0; i < column.getPositionCount(); i++) {
+ if (!column.isNull(i)) {
+ updateDoubleResult(column.getDouble(i));
+ }
+ }
+ }
+
+ private void updateDoubleResult(double val) {
+ double absExtVal = Math.abs(val);
+ double candidateResult = extremeResult.getDouble();
+ double absCandidateResult = Math.abs(extremeResult.getDouble());
+
+ if (!initResult
+ || (absExtVal > absCandidateResult)
+ || (absExtVal == absCandidateResult) && val > candidateResult) {
+ initResult = true;
+ extremeResult.setDouble(val);
+ }
+ }
+}