This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 923bb2c347c Add APPROX_COUNT_DISTINCT Function
923bb2c347c is described below
commit 923bb2c347c53e7f5949da93237427880c3c7751
Author: FearfulTomcat27 <[email protected]>
AuthorDate: Fri Apr 25 11:19:40 2025 +0800
Add APPROX_COUNT_DISTINCT Function
---
.../it/query/recent/IoTDBTableAggregationIT.java | 69 +++++
.../relational/aggregation/AccumulatorFactory.java | 5 +
.../ApproxCountDistinctAccumulator.java | 265 +++++++++++++++++
.../source/relational/aggregation/HyperLogLog.java | 246 ++++++++++++++++
.../aggregation/HyperLogLogStateFactory.java | 88 ++++++
.../GroupedApproxCountDistinctAccumulator.java | 314 +++++++++++++++++++++
.../aggregation/grouped/array/BinaryBigArray.java | 4 -
.../grouped/array/HyperLogLogBigArray.java | 84 ++++++
.../aggregation/grouped/array/MapBigArray.java | 4 -
.../aggregation/grouped/array/ObjectBigArray.java | 12 +-
.../relational/metadata/TableMetadataImpl.java | 15 +
.../plan/relational/sql/parser/AstBuilder.java | 9 +
.../iotdb/db/utils/constant/SqlConstant.java | 2 +
.../TableBuiltinAggregationFunction.java | 2 +
.../thrift-commons/src/main/thrift/common.thrift | 3 +-
15 files changed, 1102 insertions(+), 20 deletions(-)
diff --git
a/integration-test/src/test/java/org/apache/iotdb/relational/it/query/recent/IoTDBTableAggregationIT.java
b/integration-test/src/test/java/org/apache/iotdb/relational/it/query/recent/IoTDBTableAggregationIT.java
index cf6f0217c95..274868fdfc6 100644
---
a/integration-test/src/test/java/org/apache/iotdb/relational/it/query/recent/IoTDBTableAggregationIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/relational/it/query/recent/IoTDBTableAggregationIT.java
@@ -4106,6 +4106,59 @@ public class IoTDBTableAggregationIT {
DATABASE_NAME);
}
+ @Test
+ public void approxCountDistinctTest() {
+ String[] expectedHeader = buildHeaders(17);
+ String[] retArray = new String[]
{"10,2,2,4,16,2,2,5,5,5,5,2,24,32,5,10,1,"};
+ tableResultSetEqualTest(
+ "select approx_count_distinct(time), approx_count_distinct(province),
approx_count_distinct(city), approx_count_distinct(region),
approx_count_distinct(device_id), approx_count_distinct(color),
approx_count_distinct(type), approx_count_distinct(s1),
approx_count_distinct(s2), approx_count_distinct(s3),
approx_count_distinct(s4), approx_count_distinct(s5),
approx_count_distinct(s6), approx_count_distinct(s7),
approx_count_distinct(s8), approx_count_distinct(s9), approx_count_disti [...]
+ expectedHeader,
+ retArray,
+ DATABASE_NAME);
+
+ tableResultSetEqualTest(
+ "select approx_count_distinct(time, 0.02),
approx_count_distinct(province, 0.02), approx_count_distinct(city, 0.02),
approx_count_distinct(region, 0.02), approx_count_distinct(device_id, 0.02),
approx_count_distinct(color, 0.02), approx_count_distinct(type, 0.02),
approx_count_distinct(s1, 0.02), approx_count_distinct(s2, 0.02),
approx_count_distinct(s3, 0.02), approx_count_distinct(s4, 0.02),
approx_count_distinct(s5, 0.02), approx_count_distinct(s6, 0.02),
approx_count_distinct [...]
+ expectedHeader,
+ retArray,
+ DATABASE_NAME);
+
+ retArray =
+ new String[] {
+ "2024-09-24T06:15:30.000Z,beijing,2,2,",
+ "2024-09-24T06:15:31.000Z,beijing,0,0,",
+ "2024-09-24T06:15:35.000Z,beijing,2,2,",
+ "2024-09-24T06:15:36.000Z,beijing,2,4,",
+ "2024-09-24T06:15:40.000Z,beijing,0,4,",
+ "2024-09-24T06:15:41.000Z,beijing,2,0,",
+ "2024-09-24T06:15:46.000Z,beijing,0,2,",
+ "2024-09-24T06:15:50.000Z,beijing,0,2,",
+ "2024-09-24T06:15:51.000Z,beijing,2,0,",
+ "2024-09-24T06:15:55.000Z,beijing,2,0,",
+ "2024-09-24T06:15:30.000Z,shanghai,2,2,",
+ "2024-09-24T06:15:31.000Z,shanghai,0,0,",
+ "2024-09-24T06:15:35.000Z,shanghai,2,2,",
+ "2024-09-24T06:15:36.000Z,shanghai,2,4,",
+ "2024-09-24T06:15:40.000Z,shanghai,0,4,",
+ "2024-09-24T06:15:41.000Z,shanghai,2,0,",
+ "2024-09-24T06:15:46.000Z,shanghai,0,2,",
+ "2024-09-24T06:15:50.000Z,shanghai,0,2,",
+ "2024-09-24T06:15:51.000Z,shanghai,2,0,",
+ "2024-09-24T06:15:55.000Z,shanghai,2,0,",
+ };
+
+ tableResultSetEqualTest(
+ "select
time,province,approx_count_distinct(s6),approx_count_distinct(s7) from table1
group by 1,2 order by 2,1",
+ new String[] {"time", "province", "_col2", "_col3"},
+ retArray,
+ DATABASE_NAME);
+
+ tableResultSetEqualTest(
+ "select
time,province,approx_count_distinct(s6,0.02),approx_count_distinct(s7,0.02)
from table1 group by 1,2 order by 2,1",
+ new String[] {"time", "province", "_col2", "_col3"},
+ retArray,
+ DATABASE_NAME);
+ }
+
@Test
public void exceptionTest() {
tableAssertTestFail(
@@ -4136,6 +4189,22 @@ public class IoTDBTableAggregationIT {
"select last_by() from table1",
"701: Aggregate functions [last_by] should only have two or three
arguments",
DATABASE_NAME);
+ tableAssertTestFail(
+ "select approx_count_distinct() from table1",
+ "701: Aggregate functions [approx_count_distinct] should only have two
arguments",
+ DATABASE_NAME);
+ tableAssertTestFail(
+ "select approx_count_distinct(province, 0.3) from table1",
+ "750: Max Standard Error must be in [0.0040625, 0.26]: 0.3",
+ DATABASE_NAME);
+ tableAssertTestFail(
+ "select approx_count_distinct(province, 0.3) from table1",
+ "750: Max Standard Error must be in [0.0040625, 0.26]: 0.3",
+ DATABASE_NAME);
+ tableAssertTestFail(
+ "select approx_count_distinct(province, 'test') from table1",
+ "701: Second argument of Aggregate functions [approx_count_distinct]
should be numberic type and do not use expression",
+ DATABASE_NAME);
}
// ==================================================================
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/AccumulatorFactory.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/AccumulatorFactory.java
index a9b4122375f..0c9edd7e735 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/AccumulatorFactory.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/AccumulatorFactory.java
@@ -24,6 +24,7 @@ import org.apache.iotdb.commons.udf.utils.TableUDFUtils;
import org.apache.iotdb.commons.udf.utils.UDFDataTypeTransformer;
import
org.apache.iotdb.db.queryengine.execution.aggregation.VarianceAccumulator;
import
org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.grouped.GroupedAccumulator;
+import
org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.grouped.GroupedApproxCountDistinctAccumulator;
import
org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.grouped.GroupedAvgAccumulator;
import
org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.grouped.GroupedCountAccumulator;
import
org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.grouped.GroupedCountIfAccumulator;
@@ -240,6 +241,8 @@ public class AccumulatorFactory {
case VAR_POP:
return new GroupedVarianceAccumulator(
inputDataTypes.get(0), VarianceAccumulator.VarianceType.VAR_POP);
+ case APPROX_COUNT_DISTINCT:
+ return new
GroupedApproxCountDistinctAccumulator(inputDataTypes.get(0));
default:
throw new IllegalArgumentException("Invalid Aggregation function: " +
aggregationType);
}
@@ -305,6 +308,8 @@ public class AccumulatorFactory {
case VAR_POP:
return new TableVarianceAccumulator(
inputDataTypes.get(0), VarianceAccumulator.VarianceType.VAR_POP);
+ case APPROX_COUNT_DISTINCT:
+ return new ApproxCountDistinctAccumulator(inputDataTypes.get(0));
default:
throw new IllegalArgumentException("Invalid Aggregation function: " +
aggregationType);
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/ApproxCountDistinctAccumulator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/ApproxCountDistinctAccumulator.java
new file mode 100644
index 00000000000..56f8ff43595
--- /dev/null
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/ApproxCountDistinctAccumulator.java
@@ -0,0 +1,265 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package
org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation;
+
+import org.apache.tsfile.block.column.Column;
+import org.apache.tsfile.block.column.ColumnBuilder;
+import org.apache.tsfile.enums.TSDataType;
+import org.apache.tsfile.file.metadata.statistics.Statistics;
+import org.apache.tsfile.read.common.block.column.BinaryColumnBuilder;
+import org.apache.tsfile.utils.Binary;
+import org.apache.tsfile.utils.RamUsageEstimator;
+import org.apache.tsfile.write.UnSupportedDataTypeException;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static
org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.HyperLogLog.DEFAULT_STANDARD_ERROR;
+
+public class ApproxCountDistinctAccumulator implements TableAccumulator {
+ private static final long INSTANCE_SIZE =
+
RamUsageEstimator.shallowSizeOfInstance(ApproxCountDistinctAccumulator.class);
+ private final TSDataType seriesDataType;
+ private final HyperLogLogStateFactory.SingleHyperLogLogState state =
+ HyperLogLogStateFactory.createSingleState();
+
+ private static final int DEFAULT_HYPERLOGLOG_BUCKET_SIZE = 2048;
+
+ public ApproxCountDistinctAccumulator(TSDataType seriesDataType) {
+ this.seriesDataType = seriesDataType;
+ }
+
+ @Override
+ public long getEstimatedSize() {
+ return INSTANCE_SIZE
+ + RamUsageEstimator.shallowSizeOfInstance(HyperLogLog.class)
+ + Integer.BYTES * DEFAULT_HYPERLOGLOG_BUCKET_SIZE;
+ }
+
+ @Override
+ public TableAccumulator copy() {
+ return new ApproxCountDistinctAccumulator(seriesDataType);
+ }
+
+ @Override
+ public void addInput(Column[] arguments, AggregationMask mask) {
+ double maxStandardError =
+ arguments.length == 1 ? DEFAULT_STANDARD_ERROR :
arguments[1].getDouble(0);
+ HyperLogLog hll = getOrCreateHyperLogLog(state, maxStandardError);
+
+ switch (seriesDataType) {
+ case INT32:
+ case DATE:
+ addIntInput(arguments[0], mask, hll);
+ return;
+ case INT64:
+ case TIMESTAMP:
+ addLongInput(arguments[0], mask, hll);
+ return;
+ case FLOAT:
+ addFloatInput(arguments[0], mask, hll);
+ return;
+ case DOUBLE:
+ addDoubleInput(arguments[0], mask, hll);
+ return;
+ case TEXT:
+ case STRING:
+ case BLOB:
+ addBinaryInput(arguments[0], mask, hll);
+ return;
+ case BOOLEAN:
+ addBooleanInput(arguments[0], mask, hll);
+ return;
+ default:
+ throw new UnSupportedDataTypeException(
+ String.format(
+ "Unsupported data type in APPROX_COUNT_DISTINCT Aggregation:
%s", seriesDataType));
+ }
+ }
+
+ @Override
+ public void addIntermediate(Column argument) {
+
+ for (int i = 0; i < argument.getPositionCount(); i++) {
+ if (!argument.isNull(i)) {
+ HyperLogLog current = new
HyperLogLog(argument.getBinary(i).getValues());
+ state.merge(current);
+ }
+ }
+ }
+
+ @Override
+ public void evaluateIntermediate(ColumnBuilder columnBuilder) {
+ checkArgument(
+ columnBuilder instanceof BinaryColumnBuilder,
+ "intermediate input and output of APPROX_COUNT_DISTINCT should be
BinaryColumn");
+ columnBuilder.writeBinary(new Binary(state.getHyperLogLog().serialize()));
+ }
+
+ @Override
+ public void evaluateFinal(ColumnBuilder columnBuilder) {
+ columnBuilder.writeLong(state.getHyperLogLog().cardinality());
+ }
+
+ @Override
+ public boolean hasFinalResult() {
+ return false;
+ }
+
+ @Override
+ public void addStatistics(Statistics[] statistics) {
+ throw new UnsupportedOperationException(
+ "ApproxCountDistinctAccumulator does not support statistics");
+ }
+
+ @Override
+ public void reset() {
+ state.getHyperLogLog().reset();
+ }
+
+ public void addBooleanInput(Column valueColumn, AggregationMask mask,
HyperLogLog hll) {
+ int positionCount = mask.getPositionCount();
+
+ if (mask.isSelectAll()) {
+ for (int i = 0; i < valueColumn.getPositionCount(); i++) {
+ if (!valueColumn.isNull(i)) {
+ hll.add(valueColumn.getBoolean(i));
+ }
+ }
+ } else {
+ int[] selectedPositions = mask.getSelectedPositions();
+ int position;
+ for (int i = 0; i < positionCount; i++) {
+ position = selectedPositions[i];
+ if (!valueColumn.isNull(position)) {
+ hll.add(valueColumn.getBoolean(position));
+ }
+ }
+ }
+ }
+
+ public void addIntInput(Column valueColumn, AggregationMask mask,
HyperLogLog hll) {
+ int positionCount = mask.getPositionCount();
+
+ if (mask.isSelectAll()) {
+ for (int i = 0; i < valueColumn.getPositionCount(); i++) {
+ if (!valueColumn.isNull(i)) {
+ hll.add(valueColumn.getInt(i));
+ }
+ }
+ } else {
+ int[] selectedPositions = mask.getSelectedPositions();
+ int position;
+ for (int i = 0; i < positionCount; i++) {
+ position = selectedPositions[i];
+ if (!valueColumn.isNull(position)) {
+ hll.add(valueColumn.getInt(position));
+ }
+ }
+ }
+ }
+
+ public void addLongInput(Column valueColumn, AggregationMask mask,
HyperLogLog hll) {
+ int positionCount = mask.getPositionCount();
+
+ if (mask.isSelectAll()) {
+ for (int i = 0; i < valueColumn.getPositionCount(); i++) {
+ if (!valueColumn.isNull(i)) {
+ hll.add(valueColumn.getLong(i));
+ }
+ }
+ } else {
+ int[] selectedPositions = mask.getSelectedPositions();
+ int position;
+ for (int i = 0; i < positionCount; i++) {
+ position = selectedPositions[i];
+ if (!valueColumn.isNull(position)) {
+ hll.add(valueColumn.getLong(position));
+ }
+ }
+ }
+ }
+
+ public void addFloatInput(Column valueColumn, AggregationMask mask,
HyperLogLog hll) {
+ int positionCount = mask.getPositionCount();
+
+ if (mask.isSelectAll()) {
+ for (int i = 0; i < valueColumn.getPositionCount(); i++) {
+ if (!valueColumn.isNull(i)) {
+ hll.add(valueColumn.getFloat(i));
+ }
+ }
+ } else {
+ int[] selectedPositions = mask.getSelectedPositions();
+ int position;
+ for (int i = 0; i < positionCount; i++) {
+ position = selectedPositions[i];
+ if (!valueColumn.isNull(position)) {
+ hll.add(valueColumn.getFloat(position));
+ }
+ }
+ }
+ }
+
+ public void addDoubleInput(Column valueColumn, AggregationMask mask,
HyperLogLog hll) {
+ int positionCount = mask.getPositionCount();
+
+ if (mask.isSelectAll()) {
+ for (int i = 0; i < valueColumn.getPositionCount(); i++) {
+ if (!valueColumn.isNull(i)) {
+ hll.add(valueColumn.getDouble(i));
+ }
+ }
+ } else {
+ int[] selectedPositions = mask.getSelectedPositions();
+ int position;
+ for (int i = 0; i < positionCount; i++) {
+ position = selectedPositions[i];
+ if (!valueColumn.isNull(position)) {
+ hll.add(valueColumn.getDouble(position));
+ }
+ }
+ }
+ }
+
+ public void addBinaryInput(Column valueColumn, AggregationMask mask,
HyperLogLog hll) {
+ int positionCount = mask.getPositionCount();
+
+ if (mask.isSelectAll()) {
+ for (int i = 0; i < valueColumn.getPositionCount(); i++) {
+ if (!valueColumn.isNull(i)) {
+ hll.add(valueColumn.getBinary(i));
+ }
+ }
+ } else {
+ int[] selectedPositions = mask.getSelectedPositions();
+ int position;
+ for (int i = 0; i < positionCount; i++) {
+ position = selectedPositions[i];
+ if (!valueColumn.isNull(position)) {
+ hll.add(valueColumn.getBinary(position));
+ }
+ }
+ }
+ }
+
+ public static HyperLogLog getOrCreateHyperLogLog(
+ HyperLogLogStateFactory.SingleHyperLogLogState state, double
maxStandardError) {
+ HyperLogLog hll = state.getHyperLogLog();
+ if (hll == null) {
+ hll = new HyperLogLog(maxStandardError);
+ state.setHyperLogLog(hll);
+ }
+ return hll;
+ }
+}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/HyperLogLog.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/HyperLogLog.java
new file mode 100644
index 00000000000..94836eba861
--- /dev/null
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/HyperLogLog.java
@@ -0,0 +1,246 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package
org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation;
+
+import org.apache.iotdb.commons.exception.IoTDBRuntimeException;
+
+import com.google.common.base.Preconditions;
+import com.google.common.hash.HashFunction;
+import com.google.common.hash.Hashing;
+import org.apache.tsfile.utils.Binary;
+import org.apache.tsfile.utils.RamUsageEstimator;
+import org.apache.tsfile.utils.ReadWriteIOUtils;
+
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+
+import static org.apache.iotdb.rpc.TSStatusCode.NUMERIC_VALUE_OUT_OF_RANGE;
+
+public class HyperLogLog {
+ private final int[] registers;
+ // Number of registers
+ private final int m;
+ // Number of bits used for register indexing
+ private final int b;
+ // Alpha constant for bias correction
+ private final double alpha;
+
+ private static final HashFunction hashFunction = Hashing.murmur3_128();
+
+ public static final double DEFAULT_STANDARD_ERROR = 0.023;
+ private static final double LOWEST_MAX_STANDARD_ERROR = 0.0040625;
+ private static final double HIGHEST_MAX_STANDARD_ERROR = 0.26000;
+
+ private static final long INSTANCE_SIZE =
+ RamUsageEstimator.shallowSizeOfInstance(HyperLogLog.class);
+
+ /**
+ * Constructs a HyperLogLog with the given precision.
+ *
+ * <p>The precision parameter (4 <= precision <= 16)
+ */
+ public HyperLogLog(double maxStandardError) {
+ int buckets = standardErrorToBuckets(maxStandardError);
+ int precision = indexBitLength(buckets);
+
+ this.b = precision;
+ // m = 2^precision, buckets
+ this.m = buckets;
+ this.registers = new int[m];
+
+ // Set alpha based on precision
+ this.alpha = getAlpha(precision, m);
+ }
+
+ public HyperLogLog(byte[] bytes) {
+ // deserialize
+ ByteBuffer byteBuffer = ByteBuffer.wrap(bytes);
+ this.b = ReadWriteIOUtils.readInt(byteBuffer);
+ this.m = ReadWriteIOUtils.readInt(byteBuffer);
+
+ this.registers = new int[m];
+ for (int i = 0; i < m; i++) {
+ registers[i] = ReadWriteIOUtils.readInt(byteBuffer);
+ }
+ this.alpha = getAlpha(b, m);
+ }
+
+ private static double getAlpha(int precision, int m) {
+ switch (precision) {
+ case 4:
+ return 0.673;
+ case 5:
+ return 0.697;
+ case 6:
+ return 0.709;
+ default:
+ return 0.7213 / (1 + 1.079 / m);
+ }
+ }
+
+ private static boolean isPowerOf2(long value) {
+ Preconditions.checkArgument(value > 0L, "value must be positive");
+ return (value & value - 1L) == 0L;
+ }
+
+ private static int indexBitLength(int numberOfBuckets) {
+ Preconditions.checkArgument(
+ isPowerOf2(numberOfBuckets),
+ "numberOfBuckets must be a power of 2, actual: %s",
+ numberOfBuckets);
+ return Integer.numberOfTrailingZeros(numberOfBuckets);
+ }
+
+ private static int standardErrorToBuckets(double maxStandardError) {
+ if (maxStandardError <= LOWEST_MAX_STANDARD_ERROR
+ || maxStandardError >= HIGHEST_MAX_STANDARD_ERROR) {
+ throw new IoTDBRuntimeException(
+ String.format(
+ "Max Standard Error must be in [%s, %s]: %s",
+ LOWEST_MAX_STANDARD_ERROR, HIGHEST_MAX_STANDARD_ERROR,
maxStandardError),
+ NUMERIC_VALUE_OUT_OF_RANGE.getStatusCode(),
+ true);
+ }
+ return log2Ceiling((int) Math.ceil(1.04 / (maxStandardError *
maxStandardError)));
+ }
+
+ private static int log2Ceiling(int value) {
+ return Integer.highestOneBit(value - 1) << 1;
+ }
+
+ public void add(boolean value) {
+ offer(hashFunction.hashString(String.valueOf(value),
StandardCharsets.UTF_8).asLong());
+ }
+
+ public void add(int value) {
+ offer(hashFunction.hashInt(value).asLong());
+ }
+
+ public void add(long value) {
+ offer(hashFunction.hashLong(value).asLong());
+ }
+
+ public void add(float value) {
+ offer(hashFunction.hashString(String.valueOf(value),
StandardCharsets.UTF_8).asLong());
+ }
+
+ public void add(double value) {
+ offer(hashFunction.hashString(String.valueOf(value),
StandardCharsets.UTF_8).asLong());
+ }
+
+ public void add(Binary value) {
+ offer(hashFunction.hashBytes(value.getValues()).asLong());
+ }
+
+ /**
+ * Adds a value to the estimator.
+ *
+ * <p>The value to add
+ */
+ public void offer(long hash) {
+ // Compute hash of the value
+
+ // Extract the first b bits for the register index
+ int idx = (int) (hash & (m - 1));
+
+ // Count the number of leading zeros in the remaining bits
+ // Add 1 to get the position of the leftmost 1
+
+ int leadingZeros = Long.numberOfTrailingZeros(hash >>> b) + 1;
+
+ // Update the register if the new value is larger
+ registers[idx] = Math.max(registers[idx], leadingZeros);
+ }
+
+ /**
+ * Returns the estimated cardinality of the data set.
+ *
+ * @return The estimated cardinality
+ */
+ public long cardinality() {
+ double sum = 0;
+ int zeros = 0;
+
+ // Compute the harmonic mean of 2^register[i]
+ for (int i = 0; i < m; i++) {
+ sum += 1.0 / (1 << registers[i]);
+ if (registers[i] == 0) {
+ zeros++;
+ }
+ }
+
+ // Apply bias correction formula
+ double estimate = alpha * m * m / sum;
+
+ // Small range correction
+ if (estimate <= 2.5 * m) {
+ if (zeros > 0) {
+ // Linear counting for small cardinalities
+ return Math.round(m * Math.log((double) m / zeros));
+ }
+ }
+
+ // Large range correction (for values > 2^32 / 30)
+ double maxCardinality = (double) (1L << 32);
+ if (estimate > maxCardinality / 30) {
+ return Math.round(-maxCardinality * Math.log(1 - estimate /
maxCardinality));
+ }
+
+ return Math.round(estimate);
+ }
+
+ /** Resets the estimator. */
+ public void reset() {
+ Arrays.fill(registers, 0);
+ }
+
+ /**
+ * Merges another HyperLogLog instance into this one.
+ *
+ * @param other The other HyperLogLog instance to merge
+ * @throws IllegalArgumentException if the precision doesn't match
+ */
+ public void merge(HyperLogLog other) {
+ if (this.m != other.m) {
+ throw new IllegalArgumentException(
+ "Cannot merge HyperLogLog instances with different precision");
+ }
+
+ for (int i = 0; i < m; i++) {
+ registers[i] = Math.max(registers[i], other.registers[i]);
+ }
+ }
+
+ // serialize
+ public byte[] serialize() {
+ int totalBytes = Integer.BYTES * 2 + registers.length * Integer.BYTES;
+ ByteBuffer byteBuffer = ByteBuffer.allocate(totalBytes);
+ ReadWriteIOUtils.write(b, byteBuffer);
+ ReadWriteIOUtils.write(m, byteBuffer);
+ for (int i = 0; i < m; i++) {
+ ReadWriteIOUtils.write(registers[i], byteBuffer);
+ }
+ return byteBuffer.array();
+ }
+
+ public boolean equals(HyperLogLog hll) {
+ return Arrays.equals(this.serialize(), hll.serialize());
+ }
+
+ public long getEstimatedSize() {
+ return INSTANCE_SIZE + Math.toIntExact(registers.length * Integer.BYTES);
+ }
+}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/HyperLogLogStateFactory.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/HyperLogLogStateFactory.java
new file mode 100644
index 00000000000..3e98087f343
--- /dev/null
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/HyperLogLogStateFactory.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package
org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation;
+
+import
org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.grouped.array.HyperLogLogBigArray;
+
+import org.apache.tsfile.utils.RamUsageEstimator;
+
+import static java.util.Objects.requireNonNull;
+
+public class HyperLogLogStateFactory {
+ public static SingleHyperLogLogState createSingleState() {
+ return new SingleHyperLogLogState();
+ }
+
+ public static GroupedHyperLogLogState createGroupedState() {
+ return new GroupedHyperLogLogState();
+ }
+
+ public static class SingleHyperLogLogState {
+ private static final long INSTANCE_SIZE =
+ RamUsageEstimator.shallowSizeOfInstance(SingleHyperLogLogState.class);
+ private HyperLogLog hll;
+
+ public HyperLogLog getHyperLogLog() {
+ return hll;
+ }
+
+ public void setHyperLogLog(HyperLogLog value) {
+ hll = value;
+ }
+
+ public long getEstimatedSize() {
+ // not used
+ return INSTANCE_SIZE + hll.getEstimatedSize();
+ }
+
+ public void merge(HyperLogLog other) {
+ if (this.hll == null) {
+ setHyperLogLog(other);
+ } else {
+ hll.merge(other);
+ }
+ }
+ }
+
+ public static class GroupedHyperLogLogState {
+ private static final long INSTANCE_SIZE =
+ RamUsageEstimator.shallowSizeOfInstance(GroupedHyperLogLogState.class);
+ private HyperLogLogBigArray hlls = new HyperLogLogBigArray();
+
+ public HyperLogLogBigArray getHyperLogLogs() {
+ return hlls;
+ }
+
+ public void setHyperLogLogs(HyperLogLogBigArray value) {
+ requireNonNull(value, "value is null");
+ this.hlls = value;
+ }
+
+ public long getEstimatedSize() {
+ return INSTANCE_SIZE + hlls.sizeOf();
+ }
+
+ public void merge(int groupId, HyperLogLog hll) {
+ HyperLogLog existingHll = hlls.get(groupId, hll);
+ if (!existingHll.equals(hll)) {
+ existingHll.merge(hll);
+ }
+ }
+
+ public boolean isEmpty() {
+ return hlls.isEmpty();
+ }
+ }
+}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/grouped/GroupedApproxCountDistinctAccumulator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/grouped/GroupedApproxCountDistinctAccumulator.java
new file mode 100644
index 00000000000..648877977d2
--- /dev/null
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/grouped/GroupedApproxCountDistinctAccumulator.java
@@ -0,0 +1,314 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package
org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.grouped;
+
+import
org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.AggregationMask;
+import
org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.HyperLogLog;
+import
org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.HyperLogLogStateFactory;
+import
org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.grouped.array.HyperLogLogBigArray;
+
+import org.apache.tsfile.block.column.Column;
+import org.apache.tsfile.block.column.ColumnBuilder;
+import org.apache.tsfile.enums.TSDataType;
+import org.apache.tsfile.utils.Binary;
+import org.apache.tsfile.utils.RamUsageEstimator;
+import org.apache.tsfile.write.UnSupportedDataTypeException;
+
+import static
org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.HyperLogLog.DEFAULT_STANDARD_ERROR;
+
+public class GroupedApproxCountDistinctAccumulator implements
GroupedAccumulator {
+ private static final long INSTANCE_SIZE =
+
RamUsageEstimator.shallowSizeOfInstance(GroupedApproxCountDistinctAccumulator.class);
+ private final TSDataType seriesDataType;
+
+ private final HyperLogLogStateFactory.GroupedHyperLogLogState state =
+ HyperLogLogStateFactory.createGroupedState();
+
+ public GroupedApproxCountDistinctAccumulator(TSDataType seriesDataType) {
+ this.seriesDataType = seriesDataType;
+ }
+
+ @Override
+ public long getEstimatedSize() {
+ return INSTANCE_SIZE + state.getEstimatedSize();
+ }
+
+ @Override
+ public void setGroupCount(long groupCount) {
+ HyperLogLogBigArray hlls = state.getHyperLogLogs();
+ hlls.ensureCapacity(groupCount);
+ }
+
+ @Override
+ public void addInput(int[] groupIds, Column[] arguments, AggregationMask
mask) {
+ double maxStandardError =
+ arguments.length == 1 ? DEFAULT_STANDARD_ERROR :
arguments[1].getDouble(0);
+ HyperLogLogBigArray hlls = getOrCreateHyperLogLog(state);
+
+ switch (seriesDataType) {
+ case BOOLEAN:
+ addBooleanInput(groupIds, arguments[0], mask, hlls, maxStandardError);
+ return;
+ case INT32:
+ case DATE:
+ addIntInput(groupIds, arguments[0], mask, hlls, maxStandardError);
+ return;
+ case INT64:
+ case TIMESTAMP:
+ addLongInput(groupIds, arguments[0], mask, hlls, maxStandardError);
+ return;
+ case FLOAT:
+ addFloatInput(groupIds, arguments[0], mask, hlls, maxStandardError);
+ return;
+ case DOUBLE:
+ addDoubleInput(groupIds, arguments[0], mask, hlls, maxStandardError);
+ return;
+ case TEXT:
+ case STRING:
+ case BLOB:
+ addBinaryInput(groupIds, arguments[0], mask, hlls, maxStandardError);
+ return;
+ default:
+ throw new UnSupportedDataTypeException(
+ String.format(
+ "Unsupported data type in APPROX_COUNT_DISTINCT Aggregation:
%s", seriesDataType));
+ }
+ }
+
+ @Override
+ public void addIntermediate(int[] groupIds, Column argument) {
+ for (int i = 0; i < groupIds.length; i++) {
+ int groupId = groupIds[i];
+ if (!argument.isNull(i)) {
+ HyperLogLog current = new
HyperLogLog(argument.getBinary(i).getValues());
+ state.merge(groupId, current);
+ }
+ }
+ }
+
+ @Override
+ public void evaluateIntermediate(int groupId, ColumnBuilder columnBuilder) {
+ HyperLogLogBigArray hlls = state.getHyperLogLogs();
+ columnBuilder.writeBinary(new Binary(hlls.get(groupId).serialize()));
+ }
+
+ @Override
+ public void evaluateFinal(int groupId, ColumnBuilder columnBuilder) {
+ HyperLogLogBigArray hlls = state.getHyperLogLogs();
+ columnBuilder.writeLong(hlls.get(groupId).cardinality());
+ }
+
+ @Override
+ public void prepareFinal() {}
+
+ @Override
+ public void reset() {
+ state.getHyperLogLogs().reset();
+ }
+
+ public void addBooleanInput(
+ int[] groupIds,
+ Column column,
+ AggregationMask mask,
+ HyperLogLogBigArray hlls,
+ double maxStandardError) {
+ int positionCount = mask.getPositionCount();
+
+ if (mask.isSelectAll()) {
+ for (int i = 0; i < positionCount; i++) {
+ int groupId = groupIds[i];
+ HyperLogLog hll = hlls.get(groupId, maxStandardError);
+ if (!column.isNull(i)) {
+ hll.add(column.getBoolean(i));
+ }
+ }
+ } else {
+ int[] selectedPositions = mask.getSelectedPositions();
+ int position;
+ int groupId;
+ for (int i = 0; i < positionCount; i++) {
+ position = selectedPositions[i];
+ groupId = groupIds[position];
+ HyperLogLog hll = hlls.get(groupId, maxStandardError);
+ if (!column.isNull(position)) {
+ hll.add(column.getBoolean(i));
+ }
+ }
+ }
+ }
+
+ public void addIntInput(
+ int[] groupIds,
+ Column column,
+ AggregationMask mask,
+ HyperLogLogBigArray hlls,
+ double maxStandardError) {
+ int positionCount = mask.getPositionCount();
+
+ if (mask.isSelectAll()) {
+ for (int i = 0; i < positionCount; i++) {
+ int groupId = groupIds[i];
+ HyperLogLog hll = hlls.get(groupId, maxStandardError);
+ if (!column.isNull(i)) {
+ hll.add(column.getInt(i));
+ }
+ }
+ } else {
+ int[] selectedPositions = mask.getSelectedPositions();
+ int position;
+ int groupId;
+ for (int i = 0; i < positionCount; i++) {
+ position = selectedPositions[i];
+ groupId = groupIds[position];
+ HyperLogLog hll = hlls.get(groupId, maxStandardError);
+ if (!column.isNull(position)) {
+ hll.add(column.getInt(i));
+ }
+ }
+ }
+ }
+
+ public void addLongInput(
+ int[] groupIds,
+ Column column,
+ AggregationMask mask,
+ HyperLogLogBigArray hlls,
+ double maxStandardError) {
+ int positionCount = mask.getPositionCount();
+
+ if (mask.isSelectAll()) {
+ for (int i = 0; i < positionCount; i++) {
+ int groupId = groupIds[i];
+ HyperLogLog hll = hlls.get(groupId, maxStandardError);
+ if (!column.isNull(i)) {
+ hll.add(column.getLong(i));
+ }
+ }
+ } else {
+ int[] selectedPositions = mask.getSelectedPositions();
+ int position;
+ int groupId;
+ for (int i = 0; i < positionCount; i++) {
+ position = selectedPositions[i];
+ groupId = groupIds[position];
+ HyperLogLog hll = hlls.get(groupId, maxStandardError);
+ if (!column.isNull(position)) {
+ hll.add(column.getLong(i));
+ }
+ }
+ }
+ }
+
+ public void addFloatInput(
+ int[] groupIds,
+ Column column,
+ AggregationMask mask,
+ HyperLogLogBigArray hlls,
+ double maxStandardError) {
+ int positionCount = mask.getPositionCount();
+
+ if (mask.isSelectAll()) {
+ for (int i = 0; i < positionCount; i++) {
+ int groupId = groupIds[i];
+ HyperLogLog hll = hlls.get(groupId, maxStandardError);
+ if (!column.isNull(i)) {
+ hll.add(column.getFloat(i));
+ }
+ }
+ } else {
+ int[] selectedPositions = mask.getSelectedPositions();
+ int position;
+ int groupId;
+ for (int i = 0; i < positionCount; i++) {
+ position = selectedPositions[i];
+ groupId = groupIds[position];
+ HyperLogLog hll = hlls.get(groupId, maxStandardError);
+ if (!column.isNull(position)) {
+ hll.add(column.getFloat(i));
+ }
+ }
+ }
+ }
+
+ public void addDoubleInput(
+ int[] groupIds,
+ Column column,
+ AggregationMask mask,
+ HyperLogLogBigArray hlls,
+ double maxStandardError) {
+ int positionCount = mask.getPositionCount();
+
+ if (mask.isSelectAll()) {
+ for (int i = 0; i < positionCount; i++) {
+ int groupId = groupIds[i];
+ HyperLogLog hll = hlls.get(groupId, maxStandardError);
+ if (!column.isNull(i)) {
+ hll.add(column.getDouble(i));
+ }
+ }
+ } else {
+ int[] selectedPositions = mask.getSelectedPositions();
+ int position;
+ int groupId;
+ for (int i = 0; i < positionCount; i++) {
+ position = selectedPositions[i];
+ groupId = groupIds[position];
+ HyperLogLog hll = hlls.get(groupId, maxStandardError);
+ if (!column.isNull(position)) {
+ hll.add(column.getDouble(i));
+ }
+ }
+ }
+ }
+
+ public void addBinaryInput(
+ int[] groupIds,
+ Column column,
+ AggregationMask mask,
+ HyperLogLogBigArray hlls,
+ double maxStandardError) {
+ int positionCount = mask.getPositionCount();
+
+ if (mask.isSelectAll()) {
+ for (int i = 0; i < positionCount; i++) {
+ int groupId = groupIds[i];
+ HyperLogLog hll = hlls.get(groupId, maxStandardError);
+ if (!column.isNull(i)) {
+ hll.add(column.getBinary(i));
+ }
+ }
+ } else {
+ int[] selectedPositions = mask.getSelectedPositions();
+ int position;
+ int groupId;
+ for (int i = 0; i < positionCount; i++) {
+ position = selectedPositions[i];
+ groupId = groupIds[position];
+ HyperLogLog hll = hlls.get(groupId, maxStandardError);
+ if (!column.isNull(position)) {
+ hll.add(column.getBinary(i));
+ }
+ }
+ }
+ }
+
+ public static HyperLogLogBigArray getOrCreateHyperLogLog(
+ HyperLogLogStateFactory.GroupedHyperLogLogState state) {
+ if (state.isEmpty()) {
+ state.setHyperLogLogs(new HyperLogLogBigArray());
+ }
+ return state.getHyperLogLogs();
+ }
+}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/grouped/array/BinaryBigArray.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/grouped/array/BinaryBigArray.java
index 9346e5da3af..f008e704618 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/grouped/array/BinaryBigArray.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/grouped/array/BinaryBigArray.java
@@ -33,10 +33,6 @@ public final class BinaryBigArray {
array = new ObjectBigArray<>();
}
- public BinaryBigArray(Binary slice) {
- array = new ObjectBigArray<>(slice);
- }
-
/** Returns the size of this big array in bytes. */
public long sizeOf() {
return INSTANCE_SIZE + array.sizeOf() + sizeOfBinarys;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/grouped/array/HyperLogLogBigArray.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/grouped/array/HyperLogLogBigArray.java
new file mode 100644
index 00000000000..c036359b936
--- /dev/null
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/grouped/array/HyperLogLogBigArray.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package
org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.grouped.array;
+
+import
org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.HyperLogLog;
+
+import static org.apache.tsfile.utils.RamUsageEstimator.shallowSizeOf;
+import static org.apache.tsfile.utils.RamUsageEstimator.shallowSizeOfInstance;
+
+public final class HyperLogLogBigArray {
+ private static final long INSTANCE_SIZE =
shallowSizeOfInstance(HyperLogLogBigArray.class);
+ private final ObjectBigArray<HyperLogLog> array;
+ private long sizeOfHyperLogLog;
+
+ public HyperLogLogBigArray() {
+ array = new ObjectBigArray<>();
+ }
+
+ public long sizeOf() {
+ return INSTANCE_SIZE + shallowSizeOf(array) + sizeOfHyperLogLog;
+ }
+
+ public HyperLogLog get(long index) {
+ // Only use if certain that the object exists.
+ return array.get(index);
+ }
+
+ public HyperLogLog get(long index, double maxStandardError) {
+ return get(index, new HyperLogLog(maxStandardError));
+ }
+
+ public HyperLogLog get(long index, HyperLogLog hll) {
+ HyperLogLog result = array.get(index);
+ if (result == null) {
+ set(index, hll);
+ return hll;
+ }
+ return result;
+ }
+
+ public void set(long index, HyperLogLog hll) {
+ updateRetainedSize(index, hll);
+ array.set(index, hll);
+ }
+
+ public boolean isEmpty() {
+ return sizeOfHyperLogLog == 0;
+ }
+
+ public void ensureCapacity(long length) {
+ array.ensureCapacity(length);
+ }
+
+ public void updateRetainedSize(long index, HyperLogLog value) {
+ HyperLogLog hll = array.get(index);
+ if (hll != null) {
+ sizeOfHyperLogLog -= hll.getEstimatedSize();
+ }
+ if (value != null) {
+ sizeOfHyperLogLog += value.getEstimatedSize();
+ }
+ }
+
+ public void reset() {
+ array.forEach(
+ item -> {
+ if (item != null) {
+ item.reset();
+ }
+ });
+ }
+}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/grouped/array/MapBigArray.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/grouped/array/MapBigArray.java
index 47659708e03..1be73705b31 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/grouped/array/MapBigArray.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/grouped/array/MapBigArray.java
@@ -35,10 +35,6 @@ public final class MapBigArray {
array = new ObjectBigArray<>();
}
- public MapBigArray(HashMap<TsPrimitiveType, Long> slice) {
- array = new ObjectBigArray<>(slice);
- }
-
/** Returns the size of this big array in bytes. */
public long sizeOf() {
return INSTANCE_SIZE + array.sizeOf() + sizeOfMaps;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/grouped/array/ObjectBigArray.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/grouped/array/ObjectBigArray.java
index 6c5e472d167..51a7401c811 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/grouped/array/ObjectBigArray.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/grouped/array/ObjectBigArray.java
@@ -36,19 +36,12 @@ public final class ObjectBigArray<T> {
private static final long INSTANCE_SIZE =
shallowSizeOfInstance(ObjectBigArray.class);
private static final long SIZE_OF_SEGMENT = sizeOfObjectArray(SEGMENT_SIZE);
- private final Object initialValue;
-
private Object[][] array;
private long capacity;
private int segments;
/** Creates a new big array containing one initial segment */
public ObjectBigArray() {
- this(null);
- }
-
- public ObjectBigArray(Object initialValue) {
- this.initialValue = initialValue;
array = new Object[INITIAL_SEGMENTS][];
allocateNewSegment();
}
@@ -121,7 +114,7 @@ public final class ObjectBigArray<T> {
}
public void reset() {
- fill((T) initialValue);
+ fill(null);
}
public void forEach(Consumer<T> action) {
@@ -185,9 +178,6 @@ public final class ObjectBigArray<T> {
private void allocateNewSegment() {
Object[] newSegment = new Object[SEGMENT_SIZE];
- if (initialValue != null) {
- Arrays.fill(newSegment, initialValue);
- }
array[segments] = newSegment;
capacity += SEGMENT_SIZE;
segments++;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/TableMetadataImpl.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/TableMetadataImpl.java
index 756a6902a5b..bd4fe3f7a56 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/TableMetadataImpl.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/TableMetadataImpl.java
@@ -630,6 +630,20 @@ public class TableMetadataImpl implements Metadata {
}
break;
+ case SqlConstant.APPROX_COUNT_DISTINCT:
+ if (argumentTypes.size() != 1 && argumentTypes.size() != 2) {
+ throw new SemanticException(
+ String.format(
+ "Aggregate functions [%s] should only have two arguments",
functionName));
+ }
+
+ if (argumentTypes.size() == 2 &&
!isSupportedMathNumericType(argumentTypes.get(1))) {
+ throw new SemanticException(
+ String.format(
+ "Second argument of Aggregate functions [%s] should be
numberic type and do not use expression",
+ functionName));
+ }
+
case SqlConstant.COUNT:
break;
default:
@@ -641,6 +655,7 @@ public class TableMetadataImpl implements Metadata {
case SqlConstant.COUNT:
case SqlConstant.COUNT_ALL:
case SqlConstant.COUNT_IF:
+ case SqlConstant.APPROX_COUNT_DISTINCT:
return INT64;
case SqlConstant.FIRST_AGGREGATION:
case SqlConstant.LAST_AGGREGATION:
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/parser/AstBuilder.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/parser/AstBuilder.java
index 4b9938ce8e7..c829153b4b0 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/parser/AstBuilder.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/parser/AstBuilder.java
@@ -265,6 +265,7 @@ import static
org.apache.iotdb.db.queryengine.plan.relational.sql.ast.GroupingSe
import static
org.apache.iotdb.db.queryengine.plan.relational.sql.ast.GroupingSets.Type.ROLLUP;
import static
org.apache.iotdb.db.queryengine.plan.relational.sql.ast.QualifiedName.mapIdentifier;
import static org.apache.iotdb.db.utils.TimestampPrecisionUtils.currPrecision;
+import static
org.apache.iotdb.db.utils.constant.SqlConstant.APPROX_COUNT_DISTINCT;
import static org.apache.iotdb.db.utils.constant.SqlConstant.FIRST_AGGREGATION;
import static
org.apache.iotdb.db.utils.constant.SqlConstant.FIRST_BY_AGGREGATION;
import static org.apache.iotdb.db.utils.constant.SqlConstant.LAST_AGGREGATION;
@@ -2765,6 +2766,14 @@ public class AstBuilder extends
RelationalSqlBaseVisitor<Node> {
"The third argument of 'first_by' or 'last_by' function must be
'time'",
ctx);
}
+ } else if (name.toString().equalsIgnoreCase(APPROX_COUNT_DISTINCT)) {
+ if (arguments.size() == 2
+ && !(arguments.get(1) instanceof DoubleLiteral
+ || arguments.get(1) instanceof LongLiteral
+ || arguments.get(1) instanceof StringLiteral)) {
+ throw new SemanticException(
+ "The second argument of 'approx_count_distinct' function must be a
literal");
+ }
}
return new FunctionCall(getLocation(ctx), name, distinct, arguments);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/constant/SqlConstant.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/constant/SqlConstant.java
index 625af893561..63127569488 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/constant/SqlConstant.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/constant/SqlConstant.java
@@ -78,6 +78,8 @@ public class SqlConstant {
public static final String COUNT_TIME = "count_time";
public static final String COUNT_TIME_HEADER = "count_time(*)";
+ public static final String APPROX_COUNT_DISTINCT = "approx_count_distinct";
+
// names of scalar functions
public static final String DIFF = "diff";
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/relational/TableBuiltinAggregationFunction.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/relational/TableBuiltinAggregationFunction.java
index 3d0510957d1..2dd5d38e5af 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/relational/TableBuiltinAggregationFunction.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/relational/TableBuiltinAggregationFunction.java
@@ -57,6 +57,7 @@ public enum TableBuiltinAggregationFunction {
VARIANCE("variance"),
VAR_POP("var_pop"),
VAR_SAMP("var_samp"),
+ APPROX_COUNT_DISTINCT("approx_count_distinct"),
;
private final String functionName;
@@ -102,6 +103,7 @@ public enum TableBuiltinAggregationFunction {
case "variance":
case "var_pop":
case "var_samp":
+ case "approx_count_distinct":
return RowType.anonymous(Collections.emptyList());
case "extreme":
case "max":
diff --git a/iotdb-protocol/thrift-commons/src/main/thrift/common.thrift
b/iotdb-protocol/thrift-commons/src/main/thrift/common.thrift
index 161c8525cca..c6aa8d1d89f 100644
--- a/iotdb-protocol/thrift-commons/src/main/thrift/common.thrift
+++ b/iotdb-protocol/thrift-commons/src/main/thrift/common.thrift
@@ -284,7 +284,8 @@ enum TAggregationType {
LAST_BY,
MIN,
MAX,
- COUNT_ALL
+ COUNT_ALL,
+ APPROX_COUNT_DISTINCT
}
struct TShowConfigurationTemplateResp {