This is an automated email from the ASF dual-hosted git repository.
JackieTien97 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new aec7dfb3d61 Add PERCENTILE aggregation function and related validation
(#16545)
aec7dfb3d61 is described below
commit aec7dfb3d61468ac880e1910e23b3a3155779d2f
Author: FearfulTomcat27 <[email protected]>
AuthorDate: Fri Jun 12 14:13:57 2026 +0800
Add PERCENTILE aggregation function and related validation (#16545)
---
.../it/query/recent/IoTDBTableAggregationIT.java | 52 ++++
.../operator/source/relational/Percentile.java | 173 ++++++++++++
.../relational/aggregation/AccumulatorFactory.java | 28 +-
.../aggregation/PercentileAccumulator.java | 266 +++++++++++++++++++
.../grouped/GroupedPercentileAccumulator.java | 291 +++++++++++++++++++++
.../grouped/array/PercentileBigArray.java | 73 ++++++
.../calc/plan/planner/TableOperatorGenerator.java | 70 +++--
.../iotdb/calc/utils/constant/SqlConstant.java | 1 +
.../planner/DataNodeTableOperatorGenerator.java | 3 +-
.../relational/metadata/TableMetadataImpl.java | 28 +-
.../plan/relational/sql/parser/AstBuilder.java | 6 +
.../process/window/function/FunctionTestUtils.java | 5 +-
.../distribution/AggregationTableScanTest.java | 4 +-
.../analyzer/AggregationCornerCaseTest.java | 12 +-
.../TableBuiltinAggregationFunction.java | 4 +-
.../thrift-commons/src/main/thrift/common.thrift | 1 +
16 files changed, 985 insertions(+), 32 deletions(-)
diff --git
a/integration-test/src/test/java/org/apache/iotdb/relational/it/query/recent/IoTDBTableAggregationIT.java
b/integration-test/src/test/java/org/apache/iotdb/relational/it/query/recent/IoTDBTableAggregationIT.java
index bd44d5db4f6..aa635d8288d 100644
---
a/integration-test/src/test/java/org/apache/iotdb/relational/it/query/recent/IoTDBTableAggregationIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/relational/it/query/recent/IoTDBTableAggregationIT.java
@@ -4388,6 +4388,42 @@ public class IoTDBTableAggregationIT {
DATABASE_NAME);
}
+ @Test
+ public void percentileTest() {
+ tableResultSetEqualTest(
+ "select percentile(time,
0.5),percentile(s1,0.5),percentile(s2,0.5),percentile(s3,0.5),percentile(s4,0.5),percentile(s9,0.5)
from table1",
+ buildHeaders(6),
+ new String[]
{"2024-09-24T06:15:40.000Z,40,43000,37.5,43.0,2024-09-24T06:15:40.000Z,"},
+ DATABASE_NAME);
+
+ tableResultSetEqualTest(
+ "select time,province,percentile(time,
0.5),percentile(s1,0.5),percentile(s2,0.5) from table1 group by 1,2 order by
2,1",
+ new String[] {"time", "province", "_col2", "_col3", "_col4"},
+ new String[] {
+ "2024-09-24T06:15:30.000Z,beijing,2024-09-24T06:15:30.000Z,30,null,",
+
"2024-09-24T06:15:31.000Z,beijing,2024-09-24T06:15:31.000Z,null,31000,",
+
"2024-09-24T06:15:35.000Z,beijing,2024-09-24T06:15:35.000Z,null,35000,",
+ "2024-09-24T06:15:36.000Z,beijing,2024-09-24T06:15:36.000Z,36,null,",
+
"2024-09-24T06:15:40.000Z,beijing,2024-09-24T06:15:40.000Z,40,40000,",
+ "2024-09-24T06:15:41.000Z,beijing,2024-09-24T06:15:41.000Z,41,null,",
+
"2024-09-24T06:15:46.000Z,beijing,2024-09-24T06:15:46.000Z,null,46000,",
+
"2024-09-24T06:15:50.000Z,beijing,2024-09-24T06:15:50.000Z,null,50000,",
+
"2024-09-24T06:15:51.000Z,beijing,2024-09-24T06:15:51.000Z,null,null,",
+ "2024-09-24T06:15:55.000Z,beijing,2024-09-24T06:15:55.000Z,55,null,",
+
"2024-09-24T06:15:30.000Z,shanghai,2024-09-24T06:15:30.000Z,30,null,",
+
"2024-09-24T06:15:31.000Z,shanghai,2024-09-24T06:15:31.000Z,null,31000,",
+
"2024-09-24T06:15:35.000Z,shanghai,2024-09-24T06:15:35.000Z,null,35000,",
+
"2024-09-24T06:15:36.000Z,shanghai,2024-09-24T06:15:36.000Z,36,null,",
+
"2024-09-24T06:15:40.000Z,shanghai,2024-09-24T06:15:40.000Z,40,40000,",
+
"2024-09-24T06:15:41.000Z,shanghai,2024-09-24T06:15:41.000Z,41,null,",
+
"2024-09-24T06:15:46.000Z,shanghai,2024-09-24T06:15:46.000Z,null,46000,",
+
"2024-09-24T06:15:50.000Z,shanghai,2024-09-24T06:15:50.000Z,null,50000,",
+
"2024-09-24T06:15:51.000Z,shanghai,2024-09-24T06:15:51.000Z,null,null,",
+
"2024-09-24T06:15:55.000Z,shanghai,2024-09-24T06:15:55.000Z,55,null,",
+ },
+ DATABASE_NAME);
+ }
+
@Test
public void exceptionTest() {
tableAssertTestFail(
@@ -4478,6 +4514,22 @@ public class IoTDBTableAggregationIT {
"select 1 as g, approx_percentile(s1,s2,0.5) from table1 group by 1",
"701: Aggregation functions [approx_percentile] do not support weight
as INT64 type",
DATABASE_NAME);
+ tableAssertTestFail(
+ "select percentile() from table1",
+ "701: Aggregation functions [percentile] should only have two
arguments",
+ DATABASE_NAME);
+ tableAssertTestFail(
+ "select percentile(s1,1.1) from table1",
+ "701: percentage should be in [0,1], got 1.1",
+ DATABASE_NAME);
+ tableAssertTestFail(
+ "select percentile(s1,'test') from table1",
+ "701: The second argument of 'percentile' function percentage must be
a double literal",
+ DATABASE_NAME);
+ tableAssertTestFail(
+ "select percentile(s5,0.5) from table1",
+ "701: Aggregation functions [percentile] should have value column as
numeric type [INT32, INT64, FLOAT, DOUBLE, TIMESTAMP]",
+ DATABASE_NAME);
}
// ==================================================================
diff --git
a/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/execution/operator/source/relational/Percentile.java
b/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/execution/operator/source/relational/Percentile.java
new file mode 100644
index 00000000000..5e02d0a1def
--- /dev/null
+++
b/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/execution/operator/source/relational/Percentile.java
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.calc.execution.operator.source.relational;
+
+import org.apache.iotdb.commons.exception.SemanticException;
+
+import org.apache.tsfile.utils.RamUsageEstimator;
+import org.apache.tsfile.utils.ReadWriteIOUtils;
+
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+
+public class Percentile {
+ private double[] values;
+ private int size;
+ private int capacity;
+ private boolean sorted;
+
+ private static final int INITIAL_CAPACITY = 32;
+ private static final double GROWTH_FACTOR = 1.5;
+
+ public Percentile() {
+ this.capacity = INITIAL_CAPACITY;
+ this.values = new double[capacity];
+ this.size = 0;
+ this.sorted = true;
+ }
+
+ public void addValue(double value) {
+ ensureCapacity();
+ values[size++] = value;
+ sorted = false;
+ }
+
+ public void addValues(double... vals) {
+ if (vals == null || vals.length == 0) {
+ return;
+ }
+
+ int newSize = size + vals.length;
+ if (newSize > capacity) {
+ grow(newSize);
+ }
+
+ System.arraycopy(vals, 0, values, size, vals.length);
+ size = newSize;
+ sorted = false;
+ }
+
+ public void merge(Percentile other) {
+ if (other == null || other.size == 0) {
+ return;
+ }
+
+ int newSize = size + other.size;
+ if (newSize > capacity) {
+ grow(newSize);
+ }
+
+ System.arraycopy(other.values, 0, values, size, other.size);
+ size = newSize;
+ sorted = false;
+ }
+
+ public double getPercentile(double percentile) {
+ if (size == 0) {
+ return Double.NaN;
+ }
+ if (percentile < 0.0 || percentile > 1.0) {
+ throw new SemanticException("percentage should be in [0,1], got " +
percentile);
+ }
+
+ ensureSorted();
+
+ if (size == 1) {
+ return values[0];
+ }
+
+ double realIndex = percentile * (size - 1);
+ int index = (int) realIndex;
+ double fraction = realIndex - index;
+
+ if (index >= size - 1) {
+ return values[size - 1];
+ }
+
+ return values[index] + fraction * (values[index + 1] - values[index]);
+ }
+
+ public int getSize() {
+ return size;
+ }
+
+ public void clear() {
+ // Shrink the backing array back to the initial capacity so the memory
held by a large group is
+ // actually released on reset, instead of staying reserved at the
historical peak capacity.
+ if (capacity > INITIAL_CAPACITY) {
+ capacity = INITIAL_CAPACITY;
+ values = new double[capacity];
+ }
+ size = 0;
+ sorted = true;
+ }
+
+ private void ensureCapacity() {
+ if (size >= capacity) {
+ grow(size + 1);
+ }
+ }
+
+ private void grow(int minCapacity) {
+ int newCapacity = Math.max((int) (capacity * GROWTH_FACTOR), minCapacity);
+ double[] newValues = new double[newCapacity];
+ System.arraycopy(values, 0, newValues, 0, size);
+ values = newValues;
+ capacity = newCapacity;
+ }
+
+ private void ensureSorted() {
+ if (!sorted && size > 1) {
+ Arrays.sort(values, 0, size);
+ sorted = true;
+ }
+ }
+
+ public void serialize(ByteBuffer buffer) {
+ ReadWriteIOUtils.write(size, buffer);
+ for (int i = 0; i < size; i++) {
+ ReadWriteIOUtils.write(values[i], buffer);
+ }
+ }
+
+ public static Percentile deserialize(ByteBuffer buffer) {
+ int size = ReadWriteIOUtils.readInt(buffer);
+ Percentile percentile = new Percentile();
+ if (size > percentile.capacity) {
+ percentile.capacity = size;
+ percentile.values = new double[size];
+ }
+ percentile.size = size;
+ for (int i = 0; i < size; i++) {
+ percentile.values[i] = ReadWriteIOUtils.readDouble(buffer);
+ }
+ percentile.sorted = false;
+ return percentile;
+ }
+
+ public int getSerializedSize() {
+ return Integer.BYTES + (int) ((long) size * Double.BYTES);
+ }
+
+ public long getEstimatedSize() {
+ return RamUsageEstimator.shallowSizeOfInstance(Percentile.class)
+ + (long) capacity * Double.BYTES;
+ }
+}
diff --git
a/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/execution/operator/source/relational/aggregation/AccumulatorFactory.java
b/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/execution/operator/source/relational/aggregation/AccumulatorFactory.java
index f4c0d98ce65..a37f1bef0c6 100644
---
a/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/execution/operator/source/relational/aggregation/AccumulatorFactory.java
+++
b/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/execution/operator/source/relational/aggregation/AccumulatorFactory.java
@@ -49,6 +49,7 @@ import
org.apache.iotdb.calc.execution.operator.source.relational.aggregation.gr
import
org.apache.iotdb.calc.execution.operator.source.relational.aggregation.grouped.GroupedMinAccumulator;
import
org.apache.iotdb.calc.execution.operator.source.relational.aggregation.grouped.GroupedMinByAccumulator;
import
org.apache.iotdb.calc.execution.operator.source.relational.aggregation.grouped.GroupedModeAccumulator;
+import
org.apache.iotdb.calc.execution.operator.source.relational.aggregation.grouped.GroupedPercentileAccumulator;
import
org.apache.iotdb.calc.execution.operator.source.relational.aggregation.grouped.GroupedRegressionAccumulator;
import
org.apache.iotdb.calc.execution.operator.source.relational.aggregation.grouped.GroupedSumAccumulator;
import
org.apache.iotdb.calc.execution.operator.source.relational.aggregation.grouped.GroupedUserDefinedAggregateAccumulator;
@@ -56,6 +57,7 @@ import
org.apache.iotdb.calc.execution.operator.source.relational.aggregation.gr
import
org.apache.iotdb.calc.execution.operator.source.relational.aggregation.grouped.IntGroupedApproxMostFrequentAccumulator;
import
org.apache.iotdb.calc.execution.operator.source.relational.aggregation.grouped.LongGroupedApproxMostFrequentAccumulator;
import org.apache.iotdb.calc.i18n.CalcMessages;
+import org.apache.iotdb.calc.plan.planner.memory.MemoryReservationManager;
import org.apache.iotdb.common.rpc.thrift.TAggregationType;
import
org.apache.iotdb.commons.queryengine.execution.operator.source.relational.aggregation.grouped.UpdateMemory;
import
org.apache.iotdb.commons.queryengine.execution.operator.source.relational.aggregation.grouped.hash.MarkDistinctHash;
@@ -106,7 +108,8 @@ public class AccumulatorFactory {
boolean isAggTableScan,
String timeColumnName,
Set<String> measurementColumnNames,
- boolean distinct) {
+ boolean distinct,
+ MemoryReservationManager memoryReservationManager) {
TableAccumulator result;
// Input expression size of 1 indicates aggregation split has occurred and
this is a final
@@ -166,7 +169,7 @@ public class AccumulatorFactory {
? new FirstAccumulator(inputDataTypes.get(0), isAggTableScan)
: new FirstDescAccumulator(inputDataTypes.get(0));
} else {
- result = createBuiltinAccumulator(aggregationType, inputDataTypes);
+ result = createBuiltinAccumulator(aggregationType, inputDataTypes,
memoryReservationManager);
}
if (distinct) {
@@ -188,7 +191,8 @@ public class AccumulatorFactory {
List<Expression> inputExpressions,
Map<String, String> inputAttributes,
boolean ascending,
- boolean distinct) {
+ boolean distinct,
+ MemoryReservationManager memoryReservationManager) {
GroupedAccumulator result;
if (aggregationType == TAggregationType.UDAF) {
@@ -197,7 +201,12 @@ public class AccumulatorFactory {
} else {
result =
createBuiltinGroupedAccumulator(
- aggregationType, inputDataTypes, inputExpressions,
inputAttributes, ascending);
+ aggregationType,
+ inputDataTypes,
+ inputExpressions,
+ inputAttributes,
+ ascending,
+ memoryReservationManager);
}
if (distinct) {
@@ -242,7 +251,8 @@ public class AccumulatorFactory {
List<TSDataType> inputDataTypes,
List<Expression> inputExpressions,
Map<String, String> inputAttributes,
- boolean ascending) {
+ boolean ascending,
+ MemoryReservationManager memoryReservationManager) {
switch (aggregationType) {
case COUNT:
return new GroupedCountAccumulator();
@@ -326,6 +336,8 @@ public class AccumulatorFactory {
case KURTOSIS:
return new GroupedCentralMomentAccumulator(
inputDataTypes.get(0),
CentralMomentAccumulator.MomentType.KURTOSIS);
+ case PERCENTILE:
+ return new GroupedPercentileAccumulator(inputDataTypes.get(0),
memoryReservationManager);
default:
throw new IllegalArgumentException(
CalcMessages.INVALID_AGGREGATION_FUNCTION + aggregationType);
@@ -333,7 +345,9 @@ public class AccumulatorFactory {
}
public static TableAccumulator createBuiltinAccumulator(
- TAggregationType aggregationType, List<TSDataType> inputDataTypes) {
+ TAggregationType aggregationType,
+ List<TSDataType> inputDataTypes,
+ MemoryReservationManager memoryReservationManager) {
switch (aggregationType) {
case COUNT:
return new CountAccumulator();
@@ -418,6 +432,8 @@ public class AccumulatorFactory {
case KURTOSIS:
return new TableCentralMomentAccumulator(
inputDataTypes.get(0),
CentralMomentAccumulator.MomentType.KURTOSIS);
+ case PERCENTILE:
+ return new PercentileAccumulator(inputDataTypes.get(0),
memoryReservationManager);
default:
throw new IllegalArgumentException(
CalcMessages.INVALID_AGGREGATION_FUNCTION + aggregationType);
diff --git
a/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/execution/operator/source/relational/aggregation/PercentileAccumulator.java
b/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/execution/operator/source/relational/aggregation/PercentileAccumulator.java
new file mode 100644
index 00000000000..4a6b075d0c4
--- /dev/null
+++
b/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/execution/operator/source/relational/aggregation/PercentileAccumulator.java
@@ -0,0 +1,266 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.calc.execution.operator.source.relational.aggregation;
+
+import org.apache.iotdb.calc.execution.operator.source.relational.Percentile;
+import org.apache.iotdb.calc.plan.planner.memory.MemoryReservationManager;
+import org.apache.iotdb.commons.exception.SemanticException;
+
+import org.apache.tsfile.block.column.Column;
+import org.apache.tsfile.block.column.ColumnBuilder;
+import org.apache.tsfile.enums.TSDataType;
+import org.apache.tsfile.file.metadata.statistics.Statistics;
+import org.apache.tsfile.utils.Binary;
+import org.apache.tsfile.utils.RamUsageEstimator;
+import org.apache.tsfile.utils.ReadWriteIOUtils;
+import org.apache.tsfile.write.UnSupportedDataTypeException;
+
+import java.nio.ByteBuffer;
+
+public class PercentileAccumulator implements TableAccumulator {
+ private static final long INSTANCE_SIZE =
+ RamUsageEstimator.shallowSizeOfInstance(PercentileAccumulator.class);
+
+ private final TSDataType seriesDataType;
+ private Percentile percentile = new Percentile();
+ // percentage is a query-level constant; it is read once from the first
input/intermediate and
+ // kept fixed afterwards, so it never gets reset to 0 by a later all-null
batch.
+ private double percentage;
+ private boolean percentageInitialized;
+
+ private final MemoryReservationManager memoryReservationManager;
+ private long previousPercentileSize;
+
+ public PercentileAccumulator(
+ TSDataType seriesDataType, MemoryReservationManager
memoryReservationManager) {
+ this.seriesDataType = seriesDataType;
+ this.memoryReservationManager = memoryReservationManager;
+ updateMemoryReservation();
+ }
+
+ @Override
+ public long getEstimatedSize() {
+ return INSTANCE_SIZE + percentile.getEstimatedSize();
+ }
+
+ @Override
+ public TableAccumulator copy() {
+ return new PercentileAccumulator(seriesDataType, memoryReservationManager);
+ }
+
+ @Override
+ public void addInput(Column[] arguments, AggregationMask mask) {
+ if (arguments.length != 2) {
+ throw new SemanticException(
+ String.format("PERCENTILE requires 2 arguments, but got %d",
arguments.length));
+ }
+ if (!percentageInitialized) {
+ percentage = arguments[1].getDouble(0);
+ percentageInitialized = true;
+ }
+ switch (seriesDataType) {
+ case INT32:
+ addIntInput(arguments[0], mask);
+ break;
+ case INT64:
+ case TIMESTAMP:
+ addLongInput(arguments[0], mask);
+ break;
+ case FLOAT:
+ addFloatInput(arguments[0], mask);
+ break;
+ case DOUBLE:
+ addDoubleInput(arguments[0], mask);
+ break;
+ default:
+ throw new UnSupportedDataTypeException(
+ String.format("Unsupported data type in Percentile Aggregation:
%s", seriesDataType));
+ }
+ updateMemoryReservation();
+ }
+
+ @Override
+ public void addIntermediate(Column argument) {
+ for (int i = 0; i < argument.getPositionCount(); i++) {
+ if (!argument.isNull(i)) {
+ byte[] data = argument.getBinary(i).getValues();
+ ByteBuffer buffer = ByteBuffer.wrap(data);
+ // Always consume the leading 8 bytes so the buffer position is
correct for deserialize,
+ // but only keep the percentage once: every partial carries the same
query-level constant.
+ double serializedPercentage = ReadWriteIOUtils.readDouble(buffer);
+ if (!percentageInitialized) {
+ percentage = serializedPercentage;
+ percentageInitialized = true;
+ }
+ percentile.merge(Percentile.deserialize(buffer));
+ }
+ }
+ updateMemoryReservation();
+ }
+
+ @Override
+ public void evaluateIntermediate(ColumnBuilder columnBuilder) {
+ int percentileDataLength = percentile.getSerializedSize();
+ // Use long arithmetic to avoid integer overflow
+ ByteBuffer buffer = ByteBuffer.allocate(Math.toIntExact(8L +
percentileDataLength));
+ ReadWriteIOUtils.write(percentage, buffer);
+ percentile.serialize(buffer);
+ columnBuilder.writeBinary(new Binary(buffer.array()));
+ }
+
+ @Override
+ public void evaluateFinal(ColumnBuilder columnBuilder) {
+ double result = percentile.getPercentile(percentage);
+ if (Double.isNaN(result)) {
+ columnBuilder.appendNull();
+ return;
+ }
+ switch (seriesDataType) {
+ case INT32:
+ columnBuilder.writeInt((int) result);
+ break;
+ case INT64:
+ case TIMESTAMP:
+ columnBuilder.writeLong((long) result);
+ break;
+ case FLOAT:
+ columnBuilder.writeFloat((float) result);
+ break;
+ case DOUBLE:
+ columnBuilder.writeDouble(result);
+ break;
+ default:
+ throw new UnSupportedDataTypeException(
+ String.format("Unsupported data type in PERCENTILE Aggregation:
%s", seriesDataType));
+ }
+ }
+
+ @Override
+ public boolean hasFinalResult() {
+ return false;
+ }
+
+ @Override
+ public void addStatistics(Statistics[] statistics) {
+ throw new UnsupportedOperationException("PercentileAccumulator does not
support statistics");
+ }
+
+ @Override
+ public void reset() {
+ percentile = new Percentile();
+ percentageInitialized = false;
+ updateMemoryReservation();
+ }
+
+ private void updateMemoryReservation() {
+ long currentSize = percentile.getEstimatedSize();
+ long delta = currentSize - previousPercentileSize;
+ if (delta > 0) {
+ memoryReservationManager.reserveMemoryCumulatively(delta);
+ } else if (delta < 0) {
+ memoryReservationManager.releaseMemoryCumulatively(-delta);
+ }
+ previousPercentileSize = currentSize;
+ }
+
+ private void addIntInput(Column column, AggregationMask mask) {
+ int positionCount = mask.getSelectedPositionCount();
+
+ if (mask.isSelectAll()) {
+ for (int i = 0; i < positionCount; i++) {
+ if (!column.isNull(i)) {
+ percentile.addValue(column.getInt(i));
+ }
+ }
+ } else {
+ int[] selectedPositions = mask.getSelectedPositions();
+ int position;
+ for (int i = 0; i < positionCount; i++) {
+ position = selectedPositions[i];
+ if (!column.isNull(position)) {
+ percentile.addValue(column.getInt(position));
+ }
+ }
+ }
+ }
+
+ private void addLongInput(Column column, AggregationMask mask) {
+ int positionCount = mask.getSelectedPositionCount();
+
+ if (mask.isSelectAll()) {
+ for (int i = 0; i < positionCount; i++) {
+ if (!column.isNull(i)) {
+ percentile.addValue(column.getLong(i));
+ }
+ }
+ } else {
+ int[] selectedPositions = mask.getSelectedPositions();
+ int position;
+ for (int i = 0; i < positionCount; i++) {
+ position = selectedPositions[i];
+ if (!column.isNull(position)) {
+ percentile.addValue(column.getLong(position));
+ }
+ }
+ }
+ }
+
+ private void addFloatInput(Column column, AggregationMask mask) {
+ int positionCount = mask.getSelectedPositionCount();
+
+ if (mask.isSelectAll()) {
+ for (int i = 0; i < positionCount; i++) {
+ if (!column.isNull(i)) {
+ percentile.addValue(column.getFloat(i));
+ }
+ }
+ } else {
+ int[] selectedPositions = mask.getSelectedPositions();
+ int position;
+ for (int i = 0; i < positionCount; i++) {
+ position = selectedPositions[i];
+ if (!column.isNull(position)) {
+ percentile.addValue(column.getFloat(position));
+ }
+ }
+ }
+ }
+
+ private void addDoubleInput(Column column, AggregationMask mask) {
+ int positionCount = mask.getSelectedPositionCount();
+
+ if (mask.isSelectAll()) {
+ for (int i = 0; i < positionCount; i++) {
+ if (!column.isNull(i)) {
+ percentile.addValue(column.getDouble(i));
+ }
+ }
+ } else {
+ int[] selectedPositions = mask.getSelectedPositions();
+ int position;
+ for (int i = 0; i < positionCount; i++) {
+ position = selectedPositions[i];
+ if (!column.isNull(position)) {
+ percentile.addValue(column.getDouble(position));
+ }
+ }
+ }
+ }
+}
diff --git
a/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/execution/operator/source/relational/aggregation/grouped/GroupedPercentileAccumulator.java
b/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/execution/operator/source/relational/aggregation/grouped/GroupedPercentileAccumulator.java
new file mode 100644
index 00000000000..79b9017d1d2
--- /dev/null
+++
b/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/execution/operator/source/relational/aggregation/grouped/GroupedPercentileAccumulator.java
@@ -0,0 +1,291 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package
org.apache.iotdb.calc.execution.operator.source.relational.aggregation.grouped;
+
+import org.apache.iotdb.calc.execution.operator.source.relational.Percentile;
+import
org.apache.iotdb.calc.execution.operator.source.relational.aggregation.AggregationMask;
+import
org.apache.iotdb.calc.execution.operator.source.relational.aggregation.grouped.array.PercentileBigArray;
+import org.apache.iotdb.calc.plan.planner.memory.MemoryReservationManager;
+import org.apache.iotdb.commons.exception.SemanticException;
+
+import org.apache.tsfile.block.column.Column;
+import org.apache.tsfile.block.column.ColumnBuilder;
+import org.apache.tsfile.enums.TSDataType;
+import org.apache.tsfile.utils.Binary;
+import org.apache.tsfile.utils.RamUsageEstimator;
+import org.apache.tsfile.utils.ReadWriteIOUtils;
+import org.apache.tsfile.write.UnSupportedDataTypeException;
+
+import java.nio.ByteBuffer;
+
+public class GroupedPercentileAccumulator implements GroupedAccumulator {
+ private static final long INSTANCE_SIZE =
+
RamUsageEstimator.shallowSizeOfInstance(GroupedPercentileAccumulator.class);
+ private final TSDataType seriesDataType;
+ // percentage is a query-level constant; it is read once from the first
input/intermediate and
+ // kept fixed afterwards, so it never gets reset to 0 by a later all-null
batch.
+ private double percentage;
+ private boolean percentageInitialized;
+ private final MemoryReservationManager memoryReservationManager;
+ private long previousArraySize;
+ private final PercentileBigArray array = new PercentileBigArray();
+
+ public GroupedPercentileAccumulator(
+ TSDataType seriesDataType, MemoryReservationManager
memoryReservationManager) {
+ this.seriesDataType = seriesDataType;
+ this.memoryReservationManager = memoryReservationManager;
+ updateMemoryReservation();
+ }
+
+ @Override
+ public long getEstimatedSize() {
+ return INSTANCE_SIZE + array.sizeOf();
+ }
+
+ @Override
+ public void setGroupCount(long groupCount) {
+ array.ensureCapacity(groupCount);
+ }
+
+ @Override
+ public void addInput(int[] groupIds, Column[] arguments, AggregationMask
mask) {
+ if (arguments.length != 2) {
+ throw new SemanticException(
+ String.format("PERCENTILE requires 2 arguments, but got %d",
arguments.length));
+ }
+ if (!percentageInitialized) {
+ percentage = arguments[1].getDouble(0);
+ percentageInitialized = true;
+ }
+
+ switch (seriesDataType) {
+ case INT32:
+ addIntInput(groupIds, arguments, mask);
+ break;
+ case INT64:
+ case TIMESTAMP:
+ addLongInput(groupIds, arguments, mask);
+ break;
+ case FLOAT:
+ addFloatInput(groupIds, arguments, mask);
+ break;
+ case DOUBLE:
+ addDoubleInput(groupIds, arguments, mask);
+ break;
+ default:
+ throw new UnSupportedDataTypeException(
+ String.format("Unsupported data type in PERCENTILE Aggregation:
%s", seriesDataType));
+ }
+ updateMemoryReservation();
+ }
+
+ @Override
+ public void addIntermediate(int[] groupIds, Column argument) {
+ for (int i = 0; i < groupIds.length; i++) {
+ int groupId = groupIds[i];
+ if (!argument.isNull(i)) {
+ byte[] data = argument.getBinary(i).getValues();
+ ByteBuffer buffer = ByteBuffer.wrap(data);
+ // Always consume the leading 8 bytes so the buffer position is
correct for deserialize,
+ // but only keep the percentage once: every partial carries the same
query-level constant.
+ double serializedPercentage = ReadWriteIOUtils.readDouble(buffer);
+ if (!percentageInitialized) {
+ percentage = serializedPercentage;
+ percentageInitialized = true;
+ }
+ Percentile other = Percentile.deserialize(buffer);
+ array.get(groupId).merge(other);
+ }
+ }
+ updateMemoryReservation();
+ }
+
+ @Override
+ public void evaluateIntermediate(int groupId, ColumnBuilder columnBuilder) {
+ Percentile percentile = array.get(groupId);
+ int percentileDataLength = percentile.getSerializedSize();
+ // Use long arithmetic to avoid integer overflow
+ ByteBuffer buffer = ByteBuffer.allocate(Math.toIntExact(8L +
percentileDataLength));
+ ReadWriteIOUtils.write(percentage, buffer);
+ percentile.serialize(buffer);
+ columnBuilder.writeBinary(new Binary(buffer.array()));
+ }
+
+ @Override
+ public void evaluateFinal(int groupId, ColumnBuilder columnBuilder) {
+ Percentile percentile = array.get(groupId);
+ double result = percentile.getPercentile(percentage);
+ if (Double.isNaN(result)) {
+ columnBuilder.appendNull();
+ return;
+ }
+ switch (seriesDataType) {
+ case INT32:
+ columnBuilder.writeInt((int) result);
+ break;
+ case INT64:
+ case TIMESTAMP:
+ columnBuilder.writeLong((long) result);
+ break;
+ case FLOAT:
+ columnBuilder.writeFloat((float) result);
+ break;
+ case DOUBLE:
+ columnBuilder.writeDouble(result);
+ break;
+ default:
+ throw new UnSupportedDataTypeException(
+ String.format("Unsupported data type in PERCENTILE Aggregation:
%s", seriesDataType));
+ }
+ }
+
+ @Override
+ public void prepareFinal() {}
+
+ @Override
+ public void reset() {
+ array.reset();
+ percentageInitialized = false;
+ updateMemoryReservation();
+ }
+
+ private void updateMemoryReservation() {
+ long currentSize = array.sizeOf();
+ long delta = currentSize - previousArraySize;
+ if (delta > 0) {
+ memoryReservationManager.reserveMemoryCumulatively(delta);
+ } else if (delta < 0) {
+ memoryReservationManager.releaseMemoryCumulatively(-delta);
+ }
+ previousArraySize = currentSize;
+ }
+
+ public void addIntInput(int[] groupIds, Column[] arguments, AggregationMask
mask) {
+ Column valueColumn = arguments[0];
+
+ int positionCount = mask.getSelectedPositionCount();
+
+ if (mask.isSelectAll()) {
+ for (int i = 0; i < positionCount; i++) {
+ int groupId = groupIds[i];
+ Percentile percentile = array.get(groupId);
+ if (!valueColumn.isNull(i)) {
+ percentile.addValue(valueColumn.getInt(i));
+ }
+ }
+ } else {
+ int[] selectedPositions = mask.getSelectedPositions();
+ int position;
+ int groupId;
+ for (int i = 0; i < positionCount; i++) {
+ position = selectedPositions[i];
+ groupId = groupIds[position];
+ Percentile percentile = array.get(groupId);
+ if (!valueColumn.isNull(position)) {
+ percentile.addValue(valueColumn.getInt(position));
+ }
+ }
+ }
+ }
+
+ public void addLongInput(int[] groupIds, Column[] arguments, AggregationMask
mask) {
+ Column valueColumn = arguments[0];
+
+ int positionCount = mask.getSelectedPositionCount();
+
+ if (mask.isSelectAll()) {
+ for (int i = 0; i < positionCount; i++) {
+ int groupId = groupIds[i];
+ Percentile percentile = array.get(groupId);
+ if (!valueColumn.isNull(i)) {
+ percentile.addValue(valueColumn.getLong(i));
+ }
+ }
+ } else {
+ int[] selectedPositions = mask.getSelectedPositions();
+ int position;
+ int groupId;
+ for (int i = 0; i < positionCount; i++) {
+ position = selectedPositions[i];
+ groupId = groupIds[position];
+ Percentile percentile = array.get(groupId);
+ if (!valueColumn.isNull(position)) {
+ percentile.addValue(valueColumn.getLong(position));
+ }
+ }
+ }
+ }
+
+ public void addFloatInput(int[] groupIds, Column[] arguments,
AggregationMask mask) {
+ Column valueColumn = arguments[0];
+
+ int positionCount = mask.getSelectedPositionCount();
+
+ if (mask.isSelectAll()) {
+ for (int i = 0; i < positionCount; i++) {
+ int groupId = groupIds[i];
+ Percentile percentile = array.get(groupId);
+ if (!valueColumn.isNull(i)) {
+ percentile.addValue(valueColumn.getFloat(i));
+ }
+ }
+ } else {
+ int[] selectedPositions = mask.getSelectedPositions();
+ int position;
+ int groupId;
+ for (int i = 0; i < positionCount; i++) {
+ position = selectedPositions[i];
+ groupId = groupIds[position];
+ Percentile percentile = array.get(groupId);
+ if (!valueColumn.isNull(position)) {
+ percentile.addValue(valueColumn.getFloat(position));
+ }
+ }
+ }
+ }
+
+ public void addDoubleInput(int[] groupIds, Column[] arguments,
AggregationMask mask) {
+ Column valueColumn = arguments[0];
+
+ int positionCount = mask.getSelectedPositionCount();
+
+ if (mask.isSelectAll()) {
+ for (int i = 0; i < positionCount; i++) {
+ int groupId = groupIds[i];
+ Percentile percentile = array.get(groupId);
+ if (!valueColumn.isNull(i)) {
+ percentile.addValue(valueColumn.getDouble(i));
+ }
+ }
+ } else {
+ int[] selectedPositions = mask.getSelectedPositions();
+ int position;
+ int groupId;
+ for (int i = 0; i < positionCount; i++) {
+ position = selectedPositions[i];
+ groupId = groupIds[position];
+ Percentile percentile = array.get(groupId);
+ if (!valueColumn.isNull(position)) {
+ percentile.addValue(valueColumn.getDouble(position));
+ }
+ }
+ }
+ }
+}
diff --git
a/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/execution/operator/source/relational/aggregation/grouped/array/PercentileBigArray.java
b/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/execution/operator/source/relational/aggregation/grouped/array/PercentileBigArray.java
new file mode 100644
index 00000000000..7dad32c43d9
--- /dev/null
+++
b/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/execution/operator/source/relational/aggregation/grouped/array/PercentileBigArray.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package
org.apache.iotdb.calc.execution.operator.source.relational.aggregation.grouped.array;
+
+import org.apache.iotdb.calc.execution.operator.source.relational.Percentile;
+
+import static org.apache.tsfile.utils.RamUsageEstimator.shallowSizeOf;
+import static org.apache.tsfile.utils.RamUsageEstimator.shallowSizeOfInstance;
+
+public final class PercentileBigArray {
+ private static final long INSTANCE_SIZE =
shallowSizeOfInstance(PercentileBigArray.class);
+ private final ObjectBigArray<Percentile> array;
+
+ public PercentileBigArray() {
+ array = new ObjectBigArray<>();
+ }
+
+ /**
+ * Unlike fixed-size sketches (e.g. TDigest), each {@link Percentile} stores
all raw values and
+ * grows unboundedly as values are appended through {@link #get(long)}.
Caching the retained size
+ * and only refreshing it on {@code set} would therefore drift far below the
real footprint, so we
+ * sum the live estimated size of every Percentile on demand to keep memory
accounting accurate.
+ */
+ public long sizeOf() {
+ long[] sizeOfPercentile = {0};
+ array.forEach(
+ item -> {
+ if (item != null) {
+ sizeOfPercentile[0] += item.getEstimatedSize();
+ }
+ });
+ return INSTANCE_SIZE + shallowSizeOf(array) + sizeOfPercentile[0];
+ }
+
+ public Percentile get(long index) {
+ Percentile percentile = array.get(index);
+ if (percentile == null) {
+ percentile = new Percentile();
+ array.set(index, percentile);
+ }
+ return percentile;
+ }
+
+ public void ensureCapacity(long length) {
+ array.ensureCapacity(length);
+ }
+
+ public void reset() {
+ array.forEach(
+ item -> {
+ if (item != null) {
+ item.clear();
+ }
+ });
+ }
+}
diff --git
a/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/plan/planner/TableOperatorGenerator.java
b/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/plan/planner/TableOperatorGenerator.java
index c5bf2e25094..bfac196461e 100644
---
a/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/plan/planner/TableOperatorGenerator.java
+++
b/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/plan/planner/TableOperatorGenerator.java
@@ -95,6 +95,7 @@ import
org.apache.iotdb.calc.execution.operator.source.relational.aggregation.gr
import
org.apache.iotdb.calc.execution.operator.source.relational.aggregation.grouped.StreamingHashAggregationOperator;
import org.apache.iotdb.calc.execution.relational.ColumnTransformerBuilder;
import org.apache.iotdb.calc.i18n.CalcMessages;
+import org.apache.iotdb.calc.plan.planner.memory.MemoryReservationManager;
import org.apache.iotdb.calc.plan.relational.metadata.ITypeMetadata;
import org.apache.iotdb.calc.plan.relational.planner.CastToBlobLiteralVisitor;
import
org.apache.iotdb.calc.plan.relational.planner.CastToBooleanLiteralVisitor;
@@ -1327,7 +1328,8 @@ public abstract class TableOperatorGenerator<
true,
false,
null,
- Collections.emptySet())));
+ Collections.emptySet(),
+ operatorContext.getMemoryReservationContext())));
return createAggregationOperator(operatorContext, child,
aggregatorBuilder.build());
}
@@ -1341,7 +1343,8 @@ public abstract class TableOperatorGenerator<
boolean scanAscending,
boolean isAggTableScan,
String timeColumnName,
- Set<String> measurementColumnNames) {
+ Set<String> measurementColumnNames,
+ MemoryReservationManager memoryReservationManager) {
List<Integer> argumentChannels = new ArrayList<>();
for (Expression argument : aggregation.getArguments()) {
Symbol argumentSymbol = Symbol.from(argument);
@@ -1364,7 +1367,8 @@ public abstract class TableOperatorGenerator<
isAggTableScan,
timeColumnName,
measurementColumnNames,
- aggregation.isDistinct());
+ aggregation.isDistinct(),
+ memoryReservationManager);
OptionalInt maskChannel = OptionalInt.empty();
if (aggregation.hasMask()) {
@@ -1406,7 +1410,8 @@ public abstract class TableOperatorGenerator<
true,
false,
null,
- Collections.emptySet())));
+ Collections.emptySet(),
+ context.getMemoryReservationManager())));
CommonOperatorContext operatorContext =
addOperatorContext(
@@ -1428,7 +1433,13 @@ public abstract class TableOperatorGenerator<
.forEach(
(k, v) ->
aggregatorBuilder.add(
- buildGroupByAggregator(childLayout, k, v,
node.getStep(), typeProvider)));
+ buildGroupByAggregator(
+ childLayout,
+ k,
+ v,
+ node.getStep(),
+ typeProvider,
+ context.getMemoryReservationManager())));
Set<Symbol> preGroupedKeys =
ImmutableSet.copyOf(node.getPreGroupedSymbols());
List<Symbol> groupingKeys = node.getGroupingKeys();
@@ -1479,7 +1490,13 @@ public abstract class TableOperatorGenerator<
.forEach(
(k, v) ->
aggregatorBuilder.add(
- buildGroupByAggregator(childLayout, k, v, node.getStep(),
typeProvider)));
+ buildGroupByAggregator(
+ childLayout,
+ k,
+ v,
+ node.getStep(),
+ typeProvider,
+ context.getMemoryReservationManager())));
CommonOperatorContext operatorContext =
addOperatorContext(
context, node.getPlanNodeId(),
HashAggregationOperator.class.getSimpleName());
@@ -1603,7 +1620,8 @@ public abstract class TableOperatorGenerator<
Symbol symbol,
AggregationNode.Aggregation aggregation,
AggregationNode.Step step,
- ITableTypeProvider typeProvider) {
+ ITableTypeProvider typeProvider,
+ MemoryReservationManager memoryReservationManager) {
List<Integer> argumentChannels = new ArrayList<>();
for (Expression argument : aggregation.getArguments()) {
Symbol argumentSymbol = Symbol.from(argument);
@@ -1623,7 +1641,8 @@ public abstract class TableOperatorGenerator<
Collections.emptyList(),
Collections.emptyMap(),
true,
- aggregation.isDistinct());
+ aggregation.isDistinct(),
+ memoryReservationManager);
OptionalInt maskChannel = OptionalInt.empty();
if (aggregation.hasMask()) {
@@ -1718,7 +1737,8 @@ public abstract class TableOperatorGenerator<
ResolvedFunction resolvedFunction,
List<Map.Entry<Expression, Type>> arguments,
List<Integer> argumentChannels,
- PatternAggregationTracker patternAggregationTracker) {
+ PatternAggregationTracker patternAggregationTracker,
+ MemoryReservationManager memoryReservationManager) {
String functionName = resolvedFunction.getSignature().getName();
List<TSDataType> originalArgumentTypes =
resolvedFunction.getSignature().getArgumentTypes().stream()
@@ -1726,7 +1746,10 @@ public abstract class TableOperatorGenerator<
.collect(Collectors.toList());
TableAccumulator accumulator =
- createBuiltinAccumulator(getAggregationTypeByFuncName(functionName),
originalArgumentTypes);
+ createBuiltinAccumulator(
+ getAggregationTypeByFuncName(functionName),
+ originalArgumentTypes,
+ memoryReservationManager);
BoundSignature signature = resolvedFunction.getSignature();
@@ -1887,7 +1910,11 @@ public abstract class TableOperatorGenerator<
PatternAggregator variableRecognizerAggregator =
buildPatternAggregator(
- resolvedFunction, arguments, valueChannels,
patternAggregationTracker);
+ resolvedFunction,
+ arguments,
+ valueChannels,
+ patternAggregationTracker,
+ context.getMemoryReservationManager());
variableRecognizerAggregatorBuilder.add(variableRecognizerAggregator);
@@ -1978,7 +2005,11 @@ public abstract class TableOperatorGenerator<
PatternAggregator measurePatternAggregator =
buildPatternAggregator(
- resolvedFunction, arguments, valueChannels,
patternAggregationTracker);
+ resolvedFunction,
+ arguments,
+ valueChannels,
+ patternAggregationTracker,
+ context.getMemoryReservationManager());
measurePatternAggregatorBuilder.add(measurePatternAggregator);
@@ -2216,7 +2247,12 @@ public abstract class TableOperatorGenerator<
FunctionKind functionKind = resolvedFunction.getFunctionKind();
if (functionKind == FunctionKind.AGGREGATE) {
WindowAggregator tableWindowAggregator =
- buildWindowAggregator(symbol, function, typeProvider,
argumentChannels);
+ buildWindowAggregator(
+ symbol,
+ function,
+ typeProvider,
+ argumentChannels,
+ context.getMemoryReservationManager());
windowFunction = new AggregationWindowFunction(tableWindowAggregator);
} else if (functionKind == FunctionKind.WINDOW) {
String functionName =
function.getResolvedFunction().getSignature().getName();
@@ -2261,7 +2297,8 @@ public abstract class TableOperatorGenerator<
Symbol symbol,
WindowNode.Function function,
ITableTypeProvider typeProvider,
- List<Integer> argumentChannels) {
+ List<Integer> argumentChannels,
+ MemoryReservationManager memoryReservationManager) {
// Create accumulator first
String functionName =
function.getResolvedFunction().getSignature().getName();
List<TSDataType> originalArgumentTypes =
@@ -2269,7 +2306,10 @@ public abstract class TableOperatorGenerator<
.map(InternalTypeManager::getTSDataType)
.collect(Collectors.toList());
TableAccumulator accumulator =
- createBuiltinAccumulator(getAggregationTypeByFuncName(functionName),
originalArgumentTypes);
+ createBuiltinAccumulator(
+ getAggregationTypeByFuncName(functionName),
+ originalArgumentTypes,
+ memoryReservationManager);
// Create aggregator by accumulator
return new WindowAggregator(
diff --git
a/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/utils/constant/SqlConstant.java
b/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/utils/constant/SqlConstant.java
index 9d015cb3ed1..d542528ec1e 100644
---
a/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/utils/constant/SqlConstant.java
+++
b/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/utils/constant/SqlConstant.java
@@ -88,6 +88,7 @@ public class SqlConstant {
public static final String APPROX_COUNT_DISTINCT = "approx_count_distinct";
public static final String APPROX_MOST_FREQUENT = "approx_most_frequent";
public static final String APPROX_PERCENTILE = "approx_percentile";
+ public static final String PERCENTILE = "percentile";
// names of scalar functions
public static final String DIFF = "diff";
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/DataNodeTableOperatorGenerator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/DataNodeTableOperatorGenerator.java
index bd66da8daaa..185c0d12c06 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/DataNodeTableOperatorGenerator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/DataNodeTableOperatorGenerator.java
@@ -1463,7 +1463,8 @@ public class DataNodeTableOperatorGenerator
scanAscending,
true,
timeColumnName,
- measurementColumnsIndexMap.keySet()));
+ measurementColumnsIndexMap.keySet(),
+ context.getMemoryReservationManager()));
}
ITableTimeRangeIterator timeRangeIterator = null;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/TableMetadataImpl.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/TableMetadataImpl.java
index 2f0f12b9f8c..c023e69a374 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/TableMetadataImpl.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/TableMetadataImpl.java
@@ -76,6 +76,8 @@ import java.util.Optional;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
+import static
org.apache.iotdb.calc.plan.relational.metadata.CommonMetadataUtils.isDecimalType;
+import static
org.apache.iotdb.calc.plan.relational.metadata.CommonMetadataUtils.isNumericType;
import static
org.apache.iotdb.calc.transformation.dag.column.FailFunctionColumnTransformer.FAIL_FUNCTION_NAME;
import static org.apache.tsfile.read.common.type.BlobType.BLOB;
import static org.apache.tsfile.read.common.type.BooleanType.BOOLEAN;
@@ -217,7 +219,7 @@ public class TableMetadataImpl implements Metadata {
if
(TableBuiltinScalarFunction.DIFF.getFunctionName().equalsIgnoreCase(functionName))
{
if (!CommonMetadataUtils.isOneNumericType(argumentTypes)
&& !(argumentTypes.size() == 2
- && CommonMetadataUtils.isNumericType(argumentTypes.get(0))
+ && isNumericType(argumentTypes.get(0))
&& BOOLEAN.equals(argumentTypes.get(1)))) {
throw new SemanticException(
"Scalar function "
@@ -1265,7 +1267,7 @@ public class TableMetadataImpl implements Metadata {
}
Type valueColumnType = argumentTypes.get(0);
- if (!CommonMetadataUtils.isNumericType(valueColumnType)) {
+ if (!isNumericType(valueColumnType)) {
throw new SemanticException(
String.format(
"Aggregation functions [%s] should have value column as
numeric type [INT32, INT64, FLOAT, DOUBLE, TIMESTAMP]",
@@ -1273,7 +1275,7 @@ public class TableMetadataImpl implements Metadata {
}
Type percentageType = argumentTypes.get(argumentSize - 1);
- if (!CommonMetadataUtils.isDecimalType(percentageType)) {
+ if (!isDecimalType(percentageType)) {
throw new SemanticException(
String.format(
"Aggregation functions [%s] should have percentage as
decimal type",
@@ -1288,7 +1290,26 @@ public class TableMetadataImpl implements Metadata {
functionName, weightType.getDisplayName()));
}
}
+ break;
+ case SqlConstant.PERCENTILE:
+ if (argumentTypes.size() != 2) {
+ throw new SemanticException(
+ String.format(
+ "Aggregation functions [%s] should only have two arguments",
functionName));
+ }
+ if (!isNumericType(argumentTypes.get(0))) {
+ throw new SemanticException(
+ String.format(
+ "Aggregation functions [%s] should have value column as
numeric type [INT32, INT64, FLOAT, DOUBLE, TIMESTAMP]",
+ functionName));
+ }
+ if (!isDecimalType(argumentTypes.get(1))) {
+ throw new SemanticException(
+ String.format(
+ "Aggregation functions [%s] should have percentage as
decimal type",
+ functionName));
+ }
break;
case SqlConstant.COUNT:
break;
@@ -1314,6 +1335,7 @@ public class TableMetadataImpl implements Metadata {
case SqlConstant.MAX_BY:
case SqlConstant.MIN_BY:
case SqlConstant.APPROX_PERCENTILE:
+ case SqlConstant.PERCENTILE:
return argumentTypes.get(0);
case SqlConstant.AVG:
case SqlConstant.SUM:
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/parser/AstBuilder.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/parser/AstBuilder.java
index b32efbbccf4..3a04e191be7 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/parser/AstBuilder.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/parser/AstBuilder.java
@@ -318,6 +318,7 @@ import static java.util.stream.Collectors.toList;
import static
org.apache.iotdb.calc.utils.constant.SqlConstant.APPROX_COUNT_DISTINCT;
import static
org.apache.iotdb.calc.utils.constant.SqlConstant.APPROX_MOST_FREQUENT;
import static
org.apache.iotdb.calc.utils.constant.SqlConstant.APPROX_PERCENTILE;
+import static org.apache.iotdb.calc.utils.constant.SqlConstant.PERCENTILE;
import static
org.apache.iotdb.commons.queryengine.plan.relational.sql.ast.AnchorPattern.Type.PARTITION_END;
import static
org.apache.iotdb.commons.queryengine.plan.relational.sql.ast.AnchorPattern.Type.PARTITION_START;
import static
org.apache.iotdb.commons.queryengine.plan.relational.sql.ast.GroupingSets.Type.CUBE;
@@ -3687,6 +3688,11 @@ public class AstBuilder extends
RelationalSqlBaseVisitor<Node> {
throw new SemanticException(
"The third argument of 'approx_percentile' function percentage
must be a double literal");
}
+ } else if (name.toString().equalsIgnoreCase(PERCENTILE)) {
+ if (arguments.size() == 2 && !(arguments.get(1) instanceof
DoubleLiteral)) {
+ throw new SemanticException(
+ "The second argument of 'percentile' function percentage must be a
double literal");
+ }
}
return new FunctionCall(getLocation(ctx), name, window, nulls, distinct,
mode, arguments);
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/process/window/function/FunctionTestUtils.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/process/window/function/FunctionTestUtils.java
index 9da867894fc..17bc22b2a0a 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/process/window/function/FunctionTestUtils.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/process/window/function/FunctionTestUtils.java
@@ -27,6 +27,7 @@ import
org.apache.iotdb.calc.execution.operator.process.window.partition.frame.F
import
org.apache.iotdb.calc.execution.operator.source.relational.aggregation.AccumulatorFactory;
import
org.apache.iotdb.calc.execution.operator.source.relational.aggregation.TableAccumulator;
import org.apache.iotdb.common.rpc.thrift.TAggregationType;
+import
org.apache.iotdb.db.queryengine.plan.planner.memory.FakedMemoryReservationManager;
import org.apache.tsfile.enums.TSDataType;
import org.apache.tsfile.read.common.block.TsBlock;
@@ -101,7 +102,9 @@ public class FunctionTestUtils {
// inputExpressions and inputAttributes are not used in this method
TableAccumulator accumulator =
AccumulatorFactory.createBuiltinAccumulator(
- aggregationType, Collections.singletonList(inputDataType));
+ aggregationType,
+ Collections.singletonList(inputDataType),
+ new FakedMemoryReservationManager());
WindowAggregator aggregator =
new WindowAggregator(accumulator, outputDataType,
Collections.singletonList(0));
return new AggregationWindowFunction(aggregator);
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/AggregationTableScanTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/AggregationTableScanTest.java
index 9dc5ecce863..872d755a5ae 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/AggregationTableScanTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/AggregationTableScanTest.java
@@ -32,6 +32,7 @@ import
org.apache.iotdb.calc.execution.operator.source.relational.aggregation.Ta
import org.apache.iotdb.common.rpc.thrift.TAggregationType;
import org.apache.iotdb.commons.queryengine.plan.relational.sql.ast.Expression;
import
org.apache.iotdb.commons.queryengine.plan.relational.sql.ast.SymbolReference;
+import
org.apache.iotdb.db.queryengine.plan.planner.memory.FakedMemoryReservationManager;
import org.apache.tsfile.enums.TSDataType;
import org.junit.Test;
@@ -161,7 +162,8 @@ public class AggregationTableScanTest {
isAggTableScan,
TIME_COL,
measurementColumnNames,
- distinct);
+ distinct,
+ new FakedMemoryReservationManager());
String msg =
String.format(
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/AggregationCornerCaseTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/AggregationCornerCaseTest.java
index 424eb75118c..d40c73dbf6a 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/AggregationCornerCaseTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/AggregationCornerCaseTest.java
@@ -439,7 +439,8 @@ public class AggregationCornerCaseTest {
Collections.emptyList(),
Collections.emptyMap(),
true,
- false),
+ false,
+ operatorContext.getMemoryReservationContext()),
AggregationNode.Step.SINGLE,
TSDataType.INT32,
ImmutableList.of(1, 0),
@@ -453,7 +454,8 @@ public class AggregationCornerCaseTest {
Collections.emptyList(),
Collections.emptyMap(),
true,
- false),
+ false,
+ operatorContext.getMemoryReservationContext()),
AggregationNode.Step.SINGLE,
TSDataType.INT32,
ImmutableList.of(1, 0),
@@ -467,7 +469,8 @@ public class AggregationCornerCaseTest {
Collections.emptyList(),
Collections.emptyMap(),
true,
- false),
+ false,
+ operatorContext.getMemoryReservationContext()),
AggregationNode.Step.SINGLE,
TSDataType.DOUBLE,
ImmutableList.of(1),
@@ -481,7 +484,8 @@ public class AggregationCornerCaseTest {
Collections.emptyList(),
Collections.emptyMap(),
true,
- false),
+ false,
+ operatorContext.getMemoryReservationContext()),
AggregationNode.Step.SINGLE,
TSDataType.INT32,
ImmutableList.of(1),
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/relational/TableBuiltinAggregationFunction.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/relational/TableBuiltinAggregationFunction.java
index d1ff1f06a5d..e8c5806fbc5 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/relational/TableBuiltinAggregationFunction.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/relational/TableBuiltinAggregationFunction.java
@@ -66,7 +66,8 @@ public enum TableBuiltinAggregationFunction {
REGR_SLOPE("regr_slope"),
REGR_INTERCEPT("regr_intercept"),
SKEWNESS("skewness"),
- KURTOSIS("kurtosis");
+ KURTOSIS("kurtosis"),
+ PERCENTILE("percentile");
private final String functionName;
@@ -120,6 +121,7 @@ public enum TableBuiltinAggregationFunction {
case "kurtosis":
case "approx_count_distinct":
case "approx_percentile":
+ case "percentile":
return RowType.anonymous(Collections.emptyList());
case "extreme":
case "max":
diff --git a/iotdb-protocol/thrift-commons/src/main/thrift/common.thrift
b/iotdb-protocol/thrift-commons/src/main/thrift/common.thrift
index c825d8900af..a107b0259ff 100644
--- a/iotdb-protocol/thrift-commons/src/main/thrift/common.thrift
+++ b/iotdb-protocol/thrift-commons/src/main/thrift/common.thrift
@@ -317,6 +317,7 @@ enum TAggregationType {
REGR_INTERCEPT,
SKEWNESS,
KURTOSIS
+ PERCENTILE,
}
struct TShowConfigurationTemplateResp {