This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 923bb2c347c Add APPROX_COUNT_DISTINCT Function
923bb2c347c is described below

commit 923bb2c347c53e7f5949da93237427880c3c7751
Author: FearfulTomcat27 <[email protected]>
AuthorDate: Fri Apr 25 11:19:40 2025 +0800

    Add APPROX_COUNT_DISTINCT Function
---
 .../it/query/recent/IoTDBTableAggregationIT.java   |  69 +++++
 .../relational/aggregation/AccumulatorFactory.java |   5 +
 .../ApproxCountDistinctAccumulator.java            | 265 +++++++++++++++++
 .../source/relational/aggregation/HyperLogLog.java | 246 ++++++++++++++++
 .../aggregation/HyperLogLogStateFactory.java       |  88 ++++++
 .../GroupedApproxCountDistinctAccumulator.java     | 314 +++++++++++++++++++++
 .../aggregation/grouped/array/BinaryBigArray.java  |   4 -
 .../grouped/array/HyperLogLogBigArray.java         |  84 ++++++
 .../aggregation/grouped/array/MapBigArray.java     |   4 -
 .../aggregation/grouped/array/ObjectBigArray.java  |  12 +-
 .../relational/metadata/TableMetadataImpl.java     |  15 +
 .../plan/relational/sql/parser/AstBuilder.java     |   9 +
 .../iotdb/db/utils/constant/SqlConstant.java       |   2 +
 .../TableBuiltinAggregationFunction.java           |   2 +
 .../thrift-commons/src/main/thrift/common.thrift   |   3 +-
 15 files changed, 1102 insertions(+), 20 deletions(-)

diff --git 
a/integration-test/src/test/java/org/apache/iotdb/relational/it/query/recent/IoTDBTableAggregationIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/relational/it/query/recent/IoTDBTableAggregationIT.java
index cf6f0217c95..274868fdfc6 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/relational/it/query/recent/IoTDBTableAggregationIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/relational/it/query/recent/IoTDBTableAggregationIT.java
@@ -4106,6 +4106,59 @@ public class IoTDBTableAggregationIT {
         DATABASE_NAME);
   }
 
+  @Test
+  public void approxCountDistinctTest() {
+    String[] expectedHeader = buildHeaders(17);
+    String[] retArray = new String[] 
{"10,2,2,4,16,2,2,5,5,5,5,2,24,32,5,10,1,"};
+    tableResultSetEqualTest(
+        "select approx_count_distinct(time), approx_count_distinct(province), 
approx_count_distinct(city), approx_count_distinct(region), 
approx_count_distinct(device_id), approx_count_distinct(color), 
approx_count_distinct(type), approx_count_distinct(s1), 
approx_count_distinct(s2), approx_count_distinct(s3), 
approx_count_distinct(s4), approx_count_distinct(s5), 
approx_count_distinct(s6), approx_count_distinct(s7), 
approx_count_distinct(s8), approx_count_distinct(s9), approx_count_disti [...]
+        expectedHeader,
+        retArray,
+        DATABASE_NAME);
+
+    tableResultSetEqualTest(
+        "select approx_count_distinct(time, 0.02), 
approx_count_distinct(province, 0.02), approx_count_distinct(city, 0.02), 
approx_count_distinct(region, 0.02), approx_count_distinct(device_id, 0.02), 
approx_count_distinct(color, 0.02), approx_count_distinct(type, 0.02), 
approx_count_distinct(s1, 0.02), approx_count_distinct(s2, 0.02), 
approx_count_distinct(s3, 0.02), approx_count_distinct(s4, 0.02), 
approx_count_distinct(s5, 0.02), approx_count_distinct(s6, 0.02), 
approx_count_distinct [...]
+        expectedHeader,
+        retArray,
+        DATABASE_NAME);
+
+    retArray =
+        new String[] {
+          "2024-09-24T06:15:30.000Z,beijing,2,2,",
+          "2024-09-24T06:15:31.000Z,beijing,0,0,",
+          "2024-09-24T06:15:35.000Z,beijing,2,2,",
+          "2024-09-24T06:15:36.000Z,beijing,2,4,",
+          "2024-09-24T06:15:40.000Z,beijing,0,4,",
+          "2024-09-24T06:15:41.000Z,beijing,2,0,",
+          "2024-09-24T06:15:46.000Z,beijing,0,2,",
+          "2024-09-24T06:15:50.000Z,beijing,0,2,",
+          "2024-09-24T06:15:51.000Z,beijing,2,0,",
+          "2024-09-24T06:15:55.000Z,beijing,2,0,",
+          "2024-09-24T06:15:30.000Z,shanghai,2,2,",
+          "2024-09-24T06:15:31.000Z,shanghai,0,0,",
+          "2024-09-24T06:15:35.000Z,shanghai,2,2,",
+          "2024-09-24T06:15:36.000Z,shanghai,2,4,",
+          "2024-09-24T06:15:40.000Z,shanghai,0,4,",
+          "2024-09-24T06:15:41.000Z,shanghai,2,0,",
+          "2024-09-24T06:15:46.000Z,shanghai,0,2,",
+          "2024-09-24T06:15:50.000Z,shanghai,0,2,",
+          "2024-09-24T06:15:51.000Z,shanghai,2,0,",
+          "2024-09-24T06:15:55.000Z,shanghai,2,0,",
+        };
+
+    tableResultSetEqualTest(
+        "select 
time,province,approx_count_distinct(s6),approx_count_distinct(s7) from table1 
group by 1,2 order by 2,1",
+        new String[] {"time", "province", "_col2", "_col3"},
+        retArray,
+        DATABASE_NAME);
+
+    tableResultSetEqualTest(
+        "select 
time,province,approx_count_distinct(s6,0.02),approx_count_distinct(s7,0.02) 
from table1 group by 1,2 order by 2,1",
+        new String[] {"time", "province", "_col2", "_col3"},
+        retArray,
+        DATABASE_NAME);
+  }
+
   @Test
   public void exceptionTest() {
     tableAssertTestFail(
@@ -4136,6 +4189,22 @@ public class IoTDBTableAggregationIT {
         "select last_by() from table1",
         "701: Aggregate functions [last_by] should only have two or three 
arguments",
         DATABASE_NAME);
+    tableAssertTestFail(
+        "select approx_count_distinct() from table1",
+        "701: Aggregate functions [approx_count_distinct] should only have two 
arguments",
+        DATABASE_NAME);
+    tableAssertTestFail(
+        "select approx_count_distinct(province, 0.3) from table1",
+        "750: Max Standard Error must be in [0.0040625, 0.26]: 0.3",
+        DATABASE_NAME);
+    tableAssertTestFail(
+        "select approx_count_distinct(province, 0.3) from table1",
+        "750: Max Standard Error must be in [0.0040625, 0.26]: 0.3",
+        DATABASE_NAME);
+    tableAssertTestFail(
+        "select approx_count_distinct(province, 'test') from table1",
+        "701: Second argument of Aggregate functions [approx_count_distinct] 
should be numberic type and do not use expression",
+        DATABASE_NAME);
   }
 
   // ==================================================================
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/AccumulatorFactory.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/AccumulatorFactory.java
index a9b4122375f..0c9edd7e735 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/AccumulatorFactory.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/AccumulatorFactory.java
@@ -24,6 +24,7 @@ import org.apache.iotdb.commons.udf.utils.TableUDFUtils;
 import org.apache.iotdb.commons.udf.utils.UDFDataTypeTransformer;
 import 
org.apache.iotdb.db.queryengine.execution.aggregation.VarianceAccumulator;
 import 
org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.grouped.GroupedAccumulator;
+import 
org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.grouped.GroupedApproxCountDistinctAccumulator;
 import 
org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.grouped.GroupedAvgAccumulator;
 import 
org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.grouped.GroupedCountAccumulator;
 import 
org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.grouped.GroupedCountIfAccumulator;
@@ -240,6 +241,8 @@ public class AccumulatorFactory {
       case VAR_POP:
         return new GroupedVarianceAccumulator(
             inputDataTypes.get(0), VarianceAccumulator.VarianceType.VAR_POP);
+      case APPROX_COUNT_DISTINCT:
+        return new 
GroupedApproxCountDistinctAccumulator(inputDataTypes.get(0));
       default:
         throw new IllegalArgumentException("Invalid Aggregation function: " + 
aggregationType);
     }
@@ -305,6 +308,8 @@ public class AccumulatorFactory {
       case VAR_POP:
         return new TableVarianceAccumulator(
             inputDataTypes.get(0), VarianceAccumulator.VarianceType.VAR_POP);
+      case APPROX_COUNT_DISTINCT:
+        return new ApproxCountDistinctAccumulator(inputDataTypes.get(0));
       default:
         throw new IllegalArgumentException("Invalid Aggregation function: " + 
aggregationType);
     }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/ApproxCountDistinctAccumulator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/ApproxCountDistinctAccumulator.java
new file mode 100644
index 00000000000..56f8ff43595
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/ApproxCountDistinctAccumulator.java
@@ -0,0 +1,265 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package 
org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation;
+
+import org.apache.tsfile.block.column.Column;
+import org.apache.tsfile.block.column.ColumnBuilder;
+import org.apache.tsfile.enums.TSDataType;
+import org.apache.tsfile.file.metadata.statistics.Statistics;
+import org.apache.tsfile.read.common.block.column.BinaryColumnBuilder;
+import org.apache.tsfile.utils.Binary;
+import org.apache.tsfile.utils.RamUsageEstimator;
+import org.apache.tsfile.write.UnSupportedDataTypeException;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static 
org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.HyperLogLog.DEFAULT_STANDARD_ERROR;
+
+public class ApproxCountDistinctAccumulator implements TableAccumulator {
+  private static final long INSTANCE_SIZE =
+      
RamUsageEstimator.shallowSizeOfInstance(ApproxCountDistinctAccumulator.class);
+  private final TSDataType seriesDataType;
+  private final HyperLogLogStateFactory.SingleHyperLogLogState state =
+      HyperLogLogStateFactory.createSingleState();
+
+  private static final int DEFAULT_HYPERLOGLOG_BUCKET_SIZE = 2048;
+
+  public ApproxCountDistinctAccumulator(TSDataType seriesDataType) {
+    this.seriesDataType = seriesDataType;
+  }
+
+  @Override
+  public long getEstimatedSize() {
+    return INSTANCE_SIZE
+        + RamUsageEstimator.shallowSizeOfInstance(HyperLogLog.class)
+        + Integer.BYTES * DEFAULT_HYPERLOGLOG_BUCKET_SIZE;
+  }
+
+  @Override
+  public TableAccumulator copy() {
+    return new ApproxCountDistinctAccumulator(seriesDataType);
+  }
+
+  @Override
+  public void addInput(Column[] arguments, AggregationMask mask) {
+    double maxStandardError =
+        arguments.length == 1 ? DEFAULT_STANDARD_ERROR : 
arguments[1].getDouble(0);
+    HyperLogLog hll = getOrCreateHyperLogLog(state, maxStandardError);
+
+    switch (seriesDataType) {
+      case INT32:
+      case DATE:
+        addIntInput(arguments[0], mask, hll);
+        return;
+      case INT64:
+      case TIMESTAMP:
+        addLongInput(arguments[0], mask, hll);
+        return;
+      case FLOAT:
+        addFloatInput(arguments[0], mask, hll);
+        return;
+      case DOUBLE:
+        addDoubleInput(arguments[0], mask, hll);
+        return;
+      case TEXT:
+      case STRING:
+      case BLOB:
+        addBinaryInput(arguments[0], mask, hll);
+        return;
+      case BOOLEAN:
+        addBooleanInput(arguments[0], mask, hll);
+        return;
+      default:
+        throw new UnSupportedDataTypeException(
+            String.format(
+                "Unsupported data type in APPROX_COUNT_DISTINCT Aggregation: 
%s", seriesDataType));
+    }
+  }
+
+  @Override
+  public void addIntermediate(Column argument) {
+
+    for (int i = 0; i < argument.getPositionCount(); i++) {
+      if (!argument.isNull(i)) {
+        HyperLogLog current = new 
HyperLogLog(argument.getBinary(i).getValues());
+        state.merge(current);
+      }
+    }
+  }
+
+  @Override
+  public void evaluateIntermediate(ColumnBuilder columnBuilder) {
+    checkArgument(
+        columnBuilder instanceof BinaryColumnBuilder,
+        "intermediate input and output of APPROX_COUNT_DISTINCT should be 
BinaryColumn");
+    columnBuilder.writeBinary(new Binary(state.getHyperLogLog().serialize()));
+  }
+
+  @Override
+  public void evaluateFinal(ColumnBuilder columnBuilder) {
+    columnBuilder.writeLong(state.getHyperLogLog().cardinality());
+  }
+
+  @Override
+  public boolean hasFinalResult() {
+    return false;
+  }
+
+  @Override
+  public void addStatistics(Statistics[] statistics) {
+    throw new UnsupportedOperationException(
+        "ApproxCountDistinctAccumulator does not support statistics");
+  }
+
+  @Override
+  public void reset() {
+    state.getHyperLogLog().reset();
+  }
+
+  public void addBooleanInput(Column valueColumn, AggregationMask mask, 
HyperLogLog hll) {
+    int positionCount = mask.getPositionCount();
+
+    if (mask.isSelectAll()) {
+      for (int i = 0; i < valueColumn.getPositionCount(); i++) {
+        if (!valueColumn.isNull(i)) {
+          hll.add(valueColumn.getBoolean(i));
+        }
+      }
+    } else {
+      int[] selectedPositions = mask.getSelectedPositions();
+      int position;
+      for (int i = 0; i < positionCount; i++) {
+        position = selectedPositions[i];
+        if (!valueColumn.isNull(position)) {
+          hll.add(valueColumn.getBoolean(position));
+        }
+      }
+    }
+  }
+
+  public void addIntInput(Column valueColumn, AggregationMask mask, 
HyperLogLog hll) {
+    int positionCount = mask.getPositionCount();
+
+    if (mask.isSelectAll()) {
+      for (int i = 0; i < valueColumn.getPositionCount(); i++) {
+        if (!valueColumn.isNull(i)) {
+          hll.add(valueColumn.getInt(i));
+        }
+      }
+    } else {
+      int[] selectedPositions = mask.getSelectedPositions();
+      int position;
+      for (int i = 0; i < positionCount; i++) {
+        position = selectedPositions[i];
+        if (!valueColumn.isNull(position)) {
+          hll.add(valueColumn.getInt(position));
+        }
+      }
+    }
+  }
+
+  public void addLongInput(Column valueColumn, AggregationMask mask, 
HyperLogLog hll) {
+    int positionCount = mask.getPositionCount();
+
+    if (mask.isSelectAll()) {
+      for (int i = 0; i < valueColumn.getPositionCount(); i++) {
+        if (!valueColumn.isNull(i)) {
+          hll.add(valueColumn.getLong(i));
+        }
+      }
+    } else {
+      int[] selectedPositions = mask.getSelectedPositions();
+      int position;
+      for (int i = 0; i < positionCount; i++) {
+        position = selectedPositions[i];
+        if (!valueColumn.isNull(position)) {
+          hll.add(valueColumn.getLong(position));
+        }
+      }
+    }
+  }
+
+  public void addFloatInput(Column valueColumn, AggregationMask mask, 
HyperLogLog hll) {
+    int positionCount = mask.getPositionCount();
+
+    if (mask.isSelectAll()) {
+      for (int i = 0; i < valueColumn.getPositionCount(); i++) {
+        if (!valueColumn.isNull(i)) {
+          hll.add(valueColumn.getFloat(i));
+        }
+      }
+    } else {
+      int[] selectedPositions = mask.getSelectedPositions();
+      int position;
+      for (int i = 0; i < positionCount; i++) {
+        position = selectedPositions[i];
+        if (!valueColumn.isNull(position)) {
+          hll.add(valueColumn.getFloat(position));
+        }
+      }
+    }
+  }
+
+  public void addDoubleInput(Column valueColumn, AggregationMask mask, 
HyperLogLog hll) {
+    int positionCount = mask.getPositionCount();
+
+    if (mask.isSelectAll()) {
+      for (int i = 0; i < valueColumn.getPositionCount(); i++) {
+        if (!valueColumn.isNull(i)) {
+          hll.add(valueColumn.getDouble(i));
+        }
+      }
+    } else {
+      int[] selectedPositions = mask.getSelectedPositions();
+      int position;
+      for (int i = 0; i < positionCount; i++) {
+        position = selectedPositions[i];
+        if (!valueColumn.isNull(position)) {
+          hll.add(valueColumn.getDouble(position));
+        }
+      }
+    }
+  }
+
+  public void addBinaryInput(Column valueColumn, AggregationMask mask, 
HyperLogLog hll) {
+    int positionCount = mask.getPositionCount();
+
+    if (mask.isSelectAll()) {
+      for (int i = 0; i < valueColumn.getPositionCount(); i++) {
+        if (!valueColumn.isNull(i)) {
+          hll.add(valueColumn.getBinary(i));
+        }
+      }
+    } else {
+      int[] selectedPositions = mask.getSelectedPositions();
+      int position;
+      for (int i = 0; i < positionCount; i++) {
+        position = selectedPositions[i];
+        if (!valueColumn.isNull(position)) {
+          hll.add(valueColumn.getBinary(position));
+        }
+      }
+    }
+  }
+
+  public static HyperLogLog getOrCreateHyperLogLog(
+      HyperLogLogStateFactory.SingleHyperLogLogState state, double 
maxStandardError) {
+    HyperLogLog hll = state.getHyperLogLog();
+    if (hll == null) {
+      hll = new HyperLogLog(maxStandardError);
+      state.setHyperLogLog(hll);
+    }
+    return hll;
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/HyperLogLog.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/HyperLogLog.java
new file mode 100644
index 00000000000..94836eba861
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/HyperLogLog.java
@@ -0,0 +1,246 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package 
org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation;
+
+import org.apache.iotdb.commons.exception.IoTDBRuntimeException;
+
+import com.google.common.base.Preconditions;
+import com.google.common.hash.HashFunction;
+import com.google.common.hash.Hashing;
+import org.apache.tsfile.utils.Binary;
+import org.apache.tsfile.utils.RamUsageEstimator;
+import org.apache.tsfile.utils.ReadWriteIOUtils;
+
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+
+import static org.apache.iotdb.rpc.TSStatusCode.NUMERIC_VALUE_OUT_OF_RANGE;
+
+public class HyperLogLog {
+  private final int[] registers;
+  // Number of registers
+  private final int m;
+  // Number of bits used for register indexing
+  private final int b;
+  // Alpha constant for bias correction
+  private final double alpha;
+
+  private static final HashFunction hashFunction = Hashing.murmur3_128();
+
+  public static final double DEFAULT_STANDARD_ERROR = 0.023;
+  private static final double LOWEST_MAX_STANDARD_ERROR = 0.0040625;
+  private static final double HIGHEST_MAX_STANDARD_ERROR = 0.26000;
+
+  private static final long INSTANCE_SIZE =
+      RamUsageEstimator.shallowSizeOfInstance(HyperLogLog.class);
+
+  /**
+   * Constructs a HyperLogLog with the given precision.
+   *
+   * <p>The precision parameter (4 <= precision <= 16)
+   */
+  public HyperLogLog(double maxStandardError) {
+    int buckets = standardErrorToBuckets(maxStandardError);
+    int precision = indexBitLength(buckets);
+
+    this.b = precision;
+    // m = 2^precision, buckets
+    this.m = buckets;
+    this.registers = new int[m];
+
+    // Set alpha based on precision
+    this.alpha = getAlpha(precision, m);
+  }
+
+  public HyperLogLog(byte[] bytes) {
+    // deserialize
+    ByteBuffer byteBuffer = ByteBuffer.wrap(bytes);
+    this.b = ReadWriteIOUtils.readInt(byteBuffer);
+    this.m = ReadWriteIOUtils.readInt(byteBuffer);
+
+    this.registers = new int[m];
+    for (int i = 0; i < m; i++) {
+      registers[i] = ReadWriteIOUtils.readInt(byteBuffer);
+    }
+    this.alpha = getAlpha(b, m);
+  }
+
+  private static double getAlpha(int precision, int m) {
+    switch (precision) {
+      case 4:
+        return 0.673;
+      case 5:
+        return 0.697;
+      case 6:
+        return 0.709;
+      default:
+        return 0.7213 / (1 + 1.079 / m);
+    }
+  }
+
+  private static boolean isPowerOf2(long value) {
+    Preconditions.checkArgument(value > 0L, "value must be positive");
+    return (value & value - 1L) == 0L;
+  }
+
+  private static int indexBitLength(int numberOfBuckets) {
+    Preconditions.checkArgument(
+        isPowerOf2(numberOfBuckets),
+        "numberOfBuckets must be a power of 2, actual: %s",
+        numberOfBuckets);
+    return Integer.numberOfTrailingZeros(numberOfBuckets);
+  }
+
+  private static int standardErrorToBuckets(double maxStandardError) {
+    if (maxStandardError <= LOWEST_MAX_STANDARD_ERROR
+        || maxStandardError >= HIGHEST_MAX_STANDARD_ERROR) {
+      throw new IoTDBRuntimeException(
+          String.format(
+              "Max Standard Error must be in [%s, %s]: %s",
+              LOWEST_MAX_STANDARD_ERROR, HIGHEST_MAX_STANDARD_ERROR, 
maxStandardError),
+          NUMERIC_VALUE_OUT_OF_RANGE.getStatusCode(),
+          true);
+    }
+    return log2Ceiling((int) Math.ceil(1.04 / (maxStandardError * 
maxStandardError)));
+  }
+
+  private static int log2Ceiling(int value) {
+    return Integer.highestOneBit(value - 1) << 1;
+  }
+
+  public void add(boolean value) {
+    offer(hashFunction.hashString(String.valueOf(value), 
StandardCharsets.UTF_8).asLong());
+  }
+
+  public void add(int value) {
+    offer(hashFunction.hashInt(value).asLong());
+  }
+
+  public void add(long value) {
+    offer(hashFunction.hashLong(value).asLong());
+  }
+
+  public void add(float value) {
+    offer(hashFunction.hashString(String.valueOf(value), 
StandardCharsets.UTF_8).asLong());
+  }
+
+  public void add(double value) {
+    offer(hashFunction.hashString(String.valueOf(value), 
StandardCharsets.UTF_8).asLong());
+  }
+
+  public void add(Binary value) {
+    offer(hashFunction.hashBytes(value.getValues()).asLong());
+  }
+
+  /**
+   * Adds a value to the estimator.
+   *
+   * <p>The value to add
+   */
+  public void offer(long hash) {
+    // Compute hash of the value
+
+    // Extract the first b bits for the register index
+    int idx = (int) (hash & (m - 1));
+
+    // Count the number of leading zeros in the remaining bits
+    // Add 1 to get the position of the leftmost 1
+
+    int leadingZeros = Long.numberOfTrailingZeros(hash >>> b) + 1;
+
+    // Update the register if the new value is larger
+    registers[idx] = Math.max(registers[idx], leadingZeros);
+  }
+
+  /**
+   * Returns the estimated cardinality of the data set.
+   *
+   * @return The estimated cardinality
+   */
+  public long cardinality() {
+    double sum = 0;
+    int zeros = 0;
+
+    // Compute the harmonic mean of 2^register[i]
+    for (int i = 0; i < m; i++) {
+      sum += 1.0 / (1 << registers[i]);
+      if (registers[i] == 0) {
+        zeros++;
+      }
+    }
+
+    // Apply bias correction formula
+    double estimate = alpha * m * m / sum;
+
+    // Small range correction
+    if (estimate <= 2.5 * m) {
+      if (zeros > 0) {
+        // Linear counting for small cardinalities
+        return Math.round(m * Math.log((double) m / zeros));
+      }
+    }
+
+    // Large range correction (for values > 2^32 / 30)
+    double maxCardinality = (double) (1L << 32);
+    if (estimate > maxCardinality / 30) {
+      return Math.round(-maxCardinality * Math.log(1 - estimate / 
maxCardinality));
+    }
+
+    return Math.round(estimate);
+  }
+
+  /** Resets the estimator. */
+  public void reset() {
+    Arrays.fill(registers, 0);
+  }
+
+  /**
+   * Merges another HyperLogLog instance into this one.
+   *
+   * @param other The other HyperLogLog instance to merge
+   * @throws IllegalArgumentException if the precision doesn't match
+   */
+  public void merge(HyperLogLog other) {
+    if (this.m != other.m) {
+      throw new IllegalArgumentException(
+          "Cannot merge HyperLogLog instances with different precision");
+    }
+
+    for (int i = 0; i < m; i++) {
+      registers[i] = Math.max(registers[i], other.registers[i]);
+    }
+  }
+
+  // serialize
+  public byte[] serialize() {
+    int totalBytes = Integer.BYTES * 2 + registers.length * Integer.BYTES;
+    ByteBuffer byteBuffer = ByteBuffer.allocate(totalBytes);
+    ReadWriteIOUtils.write(b, byteBuffer);
+    ReadWriteIOUtils.write(m, byteBuffer);
+    for (int i = 0; i < m; i++) {
+      ReadWriteIOUtils.write(registers[i], byteBuffer);
+    }
+    return byteBuffer.array();
+  }
+
+  public boolean equals(HyperLogLog hll) {
+    return Arrays.equals(this.serialize(), hll.serialize());
+  }
+
+  public long getEstimatedSize() {
+    return INSTANCE_SIZE + Math.toIntExact(registers.length * Integer.BYTES);
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/HyperLogLogStateFactory.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/HyperLogLogStateFactory.java
new file mode 100644
index 00000000000..3e98087f343
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/HyperLogLogStateFactory.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package 
org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation;
+
+import 
org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.grouped.array.HyperLogLogBigArray;
+
+import org.apache.tsfile.utils.RamUsageEstimator;
+
+import static java.util.Objects.requireNonNull;
+
+public class HyperLogLogStateFactory {
+  public static SingleHyperLogLogState createSingleState() {
+    return new SingleHyperLogLogState();
+  }
+
+  public static GroupedHyperLogLogState createGroupedState() {
+    return new GroupedHyperLogLogState();
+  }
+
+  public static class SingleHyperLogLogState {
+    private static final long INSTANCE_SIZE =
+        RamUsageEstimator.shallowSizeOfInstance(SingleHyperLogLogState.class);
+    private HyperLogLog hll;
+
+    public HyperLogLog getHyperLogLog() {
+      return hll;
+    }
+
+    public void setHyperLogLog(HyperLogLog value) {
+      hll = value;
+    }
+
+    public long getEstimatedSize() {
+      // not used
+      return INSTANCE_SIZE + hll.getEstimatedSize();
+    }
+
+    public void merge(HyperLogLog other) {
+      if (this.hll == null) {
+        setHyperLogLog(other);
+      } else {
+        hll.merge(other);
+      }
+    }
+  }
+
+  public static class GroupedHyperLogLogState {
+    private static final long INSTANCE_SIZE =
+        RamUsageEstimator.shallowSizeOfInstance(GroupedHyperLogLogState.class);
+    private HyperLogLogBigArray hlls = new HyperLogLogBigArray();
+
+    public HyperLogLogBigArray getHyperLogLogs() {
+      return hlls;
+    }
+
+    public void setHyperLogLogs(HyperLogLogBigArray value) {
+      requireNonNull(value, "value is null");
+      this.hlls = value;
+    }
+
+    public long getEstimatedSize() {
+      return INSTANCE_SIZE + hlls.sizeOf();
+    }
+
+    public void merge(int groupId, HyperLogLog hll) {
+      HyperLogLog existingHll = hlls.get(groupId, hll);
+      if (!existingHll.equals(hll)) {
+        existingHll.merge(hll);
+      }
+    }
+
+    public boolean isEmpty() {
+      return hlls.isEmpty();
+    }
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/grouped/GroupedApproxCountDistinctAccumulator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/grouped/GroupedApproxCountDistinctAccumulator.java
new file mode 100644
index 00000000000..648877977d2
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/grouped/GroupedApproxCountDistinctAccumulator.java
@@ -0,0 +1,314 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package 
org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.grouped;
+
+import 
org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.AggregationMask;
+import 
org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.HyperLogLog;
+import 
org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.HyperLogLogStateFactory;
+import 
org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.grouped.array.HyperLogLogBigArray;
+
+import org.apache.tsfile.block.column.Column;
+import org.apache.tsfile.block.column.ColumnBuilder;
+import org.apache.tsfile.enums.TSDataType;
+import org.apache.tsfile.utils.Binary;
+import org.apache.tsfile.utils.RamUsageEstimator;
+import org.apache.tsfile.write.UnSupportedDataTypeException;
+
+import static 
org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.HyperLogLog.DEFAULT_STANDARD_ERROR;
+
+public class GroupedApproxCountDistinctAccumulator implements 
GroupedAccumulator {
+  private static final long INSTANCE_SIZE =
+      
RamUsageEstimator.shallowSizeOfInstance(GroupedApproxCountDistinctAccumulator.class);
+  private final TSDataType seriesDataType;
+
+  private final HyperLogLogStateFactory.GroupedHyperLogLogState state =
+      HyperLogLogStateFactory.createGroupedState();
+
+  public GroupedApproxCountDistinctAccumulator(TSDataType seriesDataType) {
+    this.seriesDataType = seriesDataType;
+  }
+
+  @Override
+  public long getEstimatedSize() {
+    return INSTANCE_SIZE + state.getEstimatedSize();
+  }
+
+  @Override
+  public void setGroupCount(long groupCount) {
+    HyperLogLogBigArray hlls = state.getHyperLogLogs();
+    hlls.ensureCapacity(groupCount);
+  }
+
+  @Override
+  public void addInput(int[] groupIds, Column[] arguments, AggregationMask 
mask) {
+    double maxStandardError =
+        arguments.length == 1 ? DEFAULT_STANDARD_ERROR : 
arguments[1].getDouble(0);
+    HyperLogLogBigArray hlls = getOrCreateHyperLogLog(state);
+
+    switch (seriesDataType) {
+      case BOOLEAN:
+        addBooleanInput(groupIds, arguments[0], mask, hlls, maxStandardError);
+        return;
+      case INT32:
+      case DATE:
+        addIntInput(groupIds, arguments[0], mask, hlls, maxStandardError);
+        return;
+      case INT64:
+      case TIMESTAMP:
+        addLongInput(groupIds, arguments[0], mask, hlls, maxStandardError);
+        return;
+      case FLOAT:
+        addFloatInput(groupIds, arguments[0], mask, hlls, maxStandardError);
+        return;
+      case DOUBLE:
+        addDoubleInput(groupIds, arguments[0], mask, hlls, maxStandardError);
+        return;
+      case TEXT:
+      case STRING:
+      case BLOB:
+        addBinaryInput(groupIds, arguments[0], mask, hlls, maxStandardError);
+        return;
+      default:
+        throw new UnSupportedDataTypeException(
+            String.format(
+                "Unsupported data type in APPROX_COUNT_DISTINCT Aggregation: 
%s", seriesDataType));
+    }
+  }
+
+  @Override
+  public void addIntermediate(int[] groupIds, Column argument) {
+    for (int i = 0; i < groupIds.length; i++) {
+      int groupId = groupIds[i];
+      if (!argument.isNull(i)) {
+        HyperLogLog current = new 
HyperLogLog(argument.getBinary(i).getValues());
+        state.merge(groupId, current);
+      }
+    }
+  }
+
+  @Override
+  public void evaluateIntermediate(int groupId, ColumnBuilder columnBuilder) {
+    HyperLogLogBigArray hlls = state.getHyperLogLogs();
+    columnBuilder.writeBinary(new Binary(hlls.get(groupId).serialize()));
+  }
+
+  @Override
+  public void evaluateFinal(int groupId, ColumnBuilder columnBuilder) {
+    HyperLogLogBigArray hlls = state.getHyperLogLogs();
+    columnBuilder.writeLong(hlls.get(groupId).cardinality());
+  }
+
+  @Override
+  public void prepareFinal() {}
+
+  @Override
+  public void reset() {
+    state.getHyperLogLogs().reset();
+  }
+
+  public void addBooleanInput(
+      int[] groupIds,
+      Column column,
+      AggregationMask mask,
+      HyperLogLogBigArray hlls,
+      double maxStandardError) {
+    int positionCount = mask.getPositionCount();
+
+    if (mask.isSelectAll()) {
+      for (int i = 0; i < positionCount; i++) {
+        int groupId = groupIds[i];
+        HyperLogLog hll = hlls.get(groupId, maxStandardError);
+        if (!column.isNull(i)) {
+          hll.add(column.getBoolean(i));
+        }
+      }
+    } else {
+      int[] selectedPositions = mask.getSelectedPositions();
+      int position;
+      int groupId;
+      for (int i = 0; i < positionCount; i++) {
+        position = selectedPositions[i];
+        groupId = groupIds[position];
+        HyperLogLog hll = hlls.get(groupId, maxStandardError);
+        if (!column.isNull(position)) {
+          hll.add(column.getBoolean(i));
+        }
+      }
+    }
+  }
+
+  public void addIntInput(
+      int[] groupIds,
+      Column column,
+      AggregationMask mask,
+      HyperLogLogBigArray hlls,
+      double maxStandardError) {
+    int positionCount = mask.getPositionCount();
+
+    if (mask.isSelectAll()) {
+      for (int i = 0; i < positionCount; i++) {
+        int groupId = groupIds[i];
+        HyperLogLog hll = hlls.get(groupId, maxStandardError);
+        if (!column.isNull(i)) {
+          hll.add(column.getInt(i));
+        }
+      }
+    } else {
+      int[] selectedPositions = mask.getSelectedPositions();
+      int position;
+      int groupId;
+      for (int i = 0; i < positionCount; i++) {
+        position = selectedPositions[i];
+        groupId = groupIds[position];
+        HyperLogLog hll = hlls.get(groupId, maxStandardError);
+        if (!column.isNull(position)) {
+          hll.add(column.getInt(i));
+        }
+      }
+    }
+  }
+
+  public void addLongInput(
+      int[] groupIds,
+      Column column,
+      AggregationMask mask,
+      HyperLogLogBigArray hlls,
+      double maxStandardError) {
+    int positionCount = mask.getPositionCount();
+
+    if (mask.isSelectAll()) {
+      for (int i = 0; i < positionCount; i++) {
+        int groupId = groupIds[i];
+        HyperLogLog hll = hlls.get(groupId, maxStandardError);
+        if (!column.isNull(i)) {
+          hll.add(column.getLong(i));
+        }
+      }
+    } else {
+      int[] selectedPositions = mask.getSelectedPositions();
+      int position;
+      int groupId;
+      for (int i = 0; i < positionCount; i++) {
+        position = selectedPositions[i];
+        groupId = groupIds[position];
+        HyperLogLog hll = hlls.get(groupId, maxStandardError);
+        if (!column.isNull(position)) {
+          hll.add(column.getLong(i));
+        }
+      }
+    }
+  }
+
+  public void addFloatInput(
+      int[] groupIds,
+      Column column,
+      AggregationMask mask,
+      HyperLogLogBigArray hlls,
+      double maxStandardError) {
+    int positionCount = mask.getPositionCount();
+
+    if (mask.isSelectAll()) {
+      for (int i = 0; i < positionCount; i++) {
+        int groupId = groupIds[i];
+        HyperLogLog hll = hlls.get(groupId, maxStandardError);
+        if (!column.isNull(i)) {
+          hll.add(column.getFloat(i));
+        }
+      }
+    } else {
+      int[] selectedPositions = mask.getSelectedPositions();
+      int position;
+      int groupId;
+      for (int i = 0; i < positionCount; i++) {
+        position = selectedPositions[i];
+        groupId = groupIds[position];
+        HyperLogLog hll = hlls.get(groupId, maxStandardError);
+        if (!column.isNull(position)) {
+          hll.add(column.getFloat(i));
+        }
+      }
+    }
+  }
+
+  public void addDoubleInput(
+      int[] groupIds,
+      Column column,
+      AggregationMask mask,
+      HyperLogLogBigArray hlls,
+      double maxStandardError) {
+    int positionCount = mask.getPositionCount();
+
+    if (mask.isSelectAll()) {
+      for (int i = 0; i < positionCount; i++) {
+        int groupId = groupIds[i];
+        HyperLogLog hll = hlls.get(groupId, maxStandardError);
+        if (!column.isNull(i)) {
+          hll.add(column.getDouble(i));
+        }
+      }
+    } else {
+      int[] selectedPositions = mask.getSelectedPositions();
+      int position;
+      int groupId;
+      for (int i = 0; i < positionCount; i++) {
+        position = selectedPositions[i];
+        groupId = groupIds[position];
+        HyperLogLog hll = hlls.get(groupId, maxStandardError);
+        if (!column.isNull(position)) {
+          hll.add(column.getDouble(i));
+        }
+      }
+    }
+  }
+
+  public void addBinaryInput(
+      int[] groupIds,
+      Column column,
+      AggregationMask mask,
+      HyperLogLogBigArray hlls,
+      double maxStandardError) {
+    int positionCount = mask.getPositionCount();
+
+    if (mask.isSelectAll()) {
+      for (int i = 0; i < positionCount; i++) {
+        int groupId = groupIds[i];
+        HyperLogLog hll = hlls.get(groupId, maxStandardError);
+        if (!column.isNull(i)) {
+          hll.add(column.getBinary(i));
+        }
+      }
+    } else {
+      int[] selectedPositions = mask.getSelectedPositions();
+      int position;
+      int groupId;
+      for (int i = 0; i < positionCount; i++) {
+        position = selectedPositions[i];
+        groupId = groupIds[position];
+        HyperLogLog hll = hlls.get(groupId, maxStandardError);
+        if (!column.isNull(position)) {
+          hll.add(column.getBinary(i));
+        }
+      }
+    }
+  }
+
+  public static HyperLogLogBigArray getOrCreateHyperLogLog(
+      HyperLogLogStateFactory.GroupedHyperLogLogState state) {
+    if (state.isEmpty()) {
+      state.setHyperLogLogs(new HyperLogLogBigArray());
+    }
+    return state.getHyperLogLogs();
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/grouped/array/BinaryBigArray.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/grouped/array/BinaryBigArray.java
index 9346e5da3af..f008e704618 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/grouped/array/BinaryBigArray.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/grouped/array/BinaryBigArray.java
@@ -33,10 +33,6 @@ public final class BinaryBigArray {
     array = new ObjectBigArray<>();
   }
 
-  public BinaryBigArray(Binary slice) {
-    array = new ObjectBigArray<>(slice);
-  }
-
   /** Returns the size of this big array in bytes. */
   public long sizeOf() {
     return INSTANCE_SIZE + array.sizeOf() + sizeOfBinarys;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/grouped/array/HyperLogLogBigArray.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/grouped/array/HyperLogLogBigArray.java
new file mode 100644
index 00000000000..c036359b936
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/grouped/array/HyperLogLogBigArray.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package 
org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.grouped.array;
+
+import 
org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.HyperLogLog;
+
+import static org.apache.tsfile.utils.RamUsageEstimator.shallowSizeOf;
+import static org.apache.tsfile.utils.RamUsageEstimator.shallowSizeOfInstance;
+
+public final class HyperLogLogBigArray {
+  private static final long INSTANCE_SIZE = 
shallowSizeOfInstance(HyperLogLogBigArray.class);
+  private final ObjectBigArray<HyperLogLog> array;
+  private long sizeOfHyperLogLog;
+
+  public HyperLogLogBigArray() {
+    array = new ObjectBigArray<>();
+  }
+
+  public long sizeOf() {
+    return INSTANCE_SIZE + shallowSizeOf(array) + sizeOfHyperLogLog;
+  }
+
+  public HyperLogLog get(long index) {
+    // Only use if certain that the object exists.
+    return array.get(index);
+  }
+
+  public HyperLogLog get(long index, double maxStandardError) {
+    return get(index, new HyperLogLog(maxStandardError));
+  }
+
+  public HyperLogLog get(long index, HyperLogLog hll) {
+    HyperLogLog result = array.get(index);
+    if (result == null) {
+      set(index, hll);
+      return hll;
+    }
+    return result;
+  }
+
+  public void set(long index, HyperLogLog hll) {
+    updateRetainedSize(index, hll);
+    array.set(index, hll);
+  }
+
+  public boolean isEmpty() {
+    return sizeOfHyperLogLog == 0;
+  }
+
+  public void ensureCapacity(long length) {
+    array.ensureCapacity(length);
+  }
+
+  public void updateRetainedSize(long index, HyperLogLog value) {
+    HyperLogLog hll = array.get(index);
+    if (hll != null) {
+      sizeOfHyperLogLog -= hll.getEstimatedSize();
+    }
+    if (value != null) {
+      sizeOfHyperLogLog += value.getEstimatedSize();
+    }
+  }
+
+  public void reset() {
+    array.forEach(
+        item -> {
+          if (item != null) {
+            item.reset();
+          }
+        });
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/grouped/array/MapBigArray.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/grouped/array/MapBigArray.java
index 47659708e03..1be73705b31 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/grouped/array/MapBigArray.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/grouped/array/MapBigArray.java
@@ -35,10 +35,6 @@ public final class MapBigArray {
     array = new ObjectBigArray<>();
   }
 
-  public MapBigArray(HashMap<TsPrimitiveType, Long> slice) {
-    array = new ObjectBigArray<>(slice);
-  }
-
   /** Returns the size of this big array in bytes. */
   public long sizeOf() {
     return INSTANCE_SIZE + array.sizeOf() + sizeOfMaps;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/grouped/array/ObjectBigArray.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/grouped/array/ObjectBigArray.java
index 6c5e472d167..51a7401c811 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/grouped/array/ObjectBigArray.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/grouped/array/ObjectBigArray.java
@@ -36,19 +36,12 @@ public final class ObjectBigArray<T> {
   private static final long INSTANCE_SIZE = 
shallowSizeOfInstance(ObjectBigArray.class);
   private static final long SIZE_OF_SEGMENT = sizeOfObjectArray(SEGMENT_SIZE);
 
-  private final Object initialValue;
-
   private Object[][] array;
   private long capacity;
   private int segments;
 
   /** Creates a new big array containing one initial segment */
   public ObjectBigArray() {
-    this(null);
-  }
-
-  public ObjectBigArray(Object initialValue) {
-    this.initialValue = initialValue;
     array = new Object[INITIAL_SEGMENTS][];
     allocateNewSegment();
   }
@@ -121,7 +114,7 @@ public final class ObjectBigArray<T> {
   }
 
   public void reset() {
-    fill((T) initialValue);
+    fill(null);
   }
 
   public void forEach(Consumer<T> action) {
@@ -185,9 +178,6 @@ public final class ObjectBigArray<T> {
 
   private void allocateNewSegment() {
     Object[] newSegment = new Object[SEGMENT_SIZE];
-    if (initialValue != null) {
-      Arrays.fill(newSegment, initialValue);
-    }
     array[segments] = newSegment;
     capacity += SEGMENT_SIZE;
     segments++;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/TableMetadataImpl.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/TableMetadataImpl.java
index 756a6902a5b..bd4fe3f7a56 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/TableMetadataImpl.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/TableMetadataImpl.java
@@ -630,6 +630,20 @@ public class TableMetadataImpl implements Metadata {
         }
 
         break;
+      case SqlConstant.APPROX_COUNT_DISTINCT:
+        if (argumentTypes.size() != 1 && argumentTypes.size() != 2) {
+          throw new SemanticException(
+              String.format(
+                  "Aggregate functions [%s] should only have two arguments", 
functionName));
+        }
+
+        if (argumentTypes.size() == 2 && 
!isSupportedMathNumericType(argumentTypes.get(1))) {
+          throw new SemanticException(
+              String.format(
+                  "Second argument of Aggregate functions [%s] should be 
numberic type and do not use expression",
+                  functionName));
+        }
+
       case SqlConstant.COUNT:
         break;
       default:
@@ -641,6 +655,7 @@ public class TableMetadataImpl implements Metadata {
       case SqlConstant.COUNT:
       case SqlConstant.COUNT_ALL:
       case SqlConstant.COUNT_IF:
+      case SqlConstant.APPROX_COUNT_DISTINCT:
         return INT64;
       case SqlConstant.FIRST_AGGREGATION:
       case SqlConstant.LAST_AGGREGATION:
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/parser/AstBuilder.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/parser/AstBuilder.java
index 4b9938ce8e7..c829153b4b0 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/parser/AstBuilder.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/parser/AstBuilder.java
@@ -265,6 +265,7 @@ import static 
org.apache.iotdb.db.queryengine.plan.relational.sql.ast.GroupingSe
 import static 
org.apache.iotdb.db.queryengine.plan.relational.sql.ast.GroupingSets.Type.ROLLUP;
 import static 
org.apache.iotdb.db.queryengine.plan.relational.sql.ast.QualifiedName.mapIdentifier;
 import static org.apache.iotdb.db.utils.TimestampPrecisionUtils.currPrecision;
+import static 
org.apache.iotdb.db.utils.constant.SqlConstant.APPROX_COUNT_DISTINCT;
 import static org.apache.iotdb.db.utils.constant.SqlConstant.FIRST_AGGREGATION;
 import static 
org.apache.iotdb.db.utils.constant.SqlConstant.FIRST_BY_AGGREGATION;
 import static org.apache.iotdb.db.utils.constant.SqlConstant.LAST_AGGREGATION;
@@ -2765,6 +2766,14 @@ public class AstBuilder extends 
RelationalSqlBaseVisitor<Node> {
             "The third argument of 'first_by' or 'last_by' function must be 
'time'",
             ctx);
       }
+    } else if (name.toString().equalsIgnoreCase(APPROX_COUNT_DISTINCT)) {
+      if (arguments.size() == 2
+          && !(arguments.get(1) instanceof DoubleLiteral
+              || arguments.get(1) instanceof LongLiteral
+              || arguments.get(1) instanceof StringLiteral)) {
+        throw new SemanticException(
+            "The second argument of 'approx_count_distinct' function must be a 
literal");
+      }
     }
 
     return new FunctionCall(getLocation(ctx), name, distinct, arguments);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/constant/SqlConstant.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/constant/SqlConstant.java
index 625af893561..63127569488 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/constant/SqlConstant.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/constant/SqlConstant.java
@@ -78,6 +78,8 @@ public class SqlConstant {
   public static final String COUNT_TIME = "count_time";
   public static final String COUNT_TIME_HEADER = "count_time(*)";
 
+  public static final String APPROX_COUNT_DISTINCT = "approx_count_distinct";
+
   // names of scalar functions
   public static final String DIFF = "diff";
 
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/relational/TableBuiltinAggregationFunction.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/relational/TableBuiltinAggregationFunction.java
index 3d0510957d1..2dd5d38e5af 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/relational/TableBuiltinAggregationFunction.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/relational/TableBuiltinAggregationFunction.java
@@ -57,6 +57,7 @@ public enum TableBuiltinAggregationFunction {
   VARIANCE("variance"),
   VAR_POP("var_pop"),
   VAR_SAMP("var_samp"),
+  APPROX_COUNT_DISTINCT("approx_count_distinct"),
   ;
 
   private final String functionName;
@@ -102,6 +103,7 @@ public enum TableBuiltinAggregationFunction {
       case "variance":
       case "var_pop":
       case "var_samp":
+      case "approx_count_distinct":
         return RowType.anonymous(Collections.emptyList());
       case "extreme":
       case "max":
diff --git a/iotdb-protocol/thrift-commons/src/main/thrift/common.thrift 
b/iotdb-protocol/thrift-commons/src/main/thrift/common.thrift
index 161c8525cca..c6aa8d1d89f 100644
--- a/iotdb-protocol/thrift-commons/src/main/thrift/common.thrift
+++ b/iotdb-protocol/thrift-commons/src/main/thrift/common.thrift
@@ -284,7 +284,8 @@ enum TAggregationType {
   LAST_BY,
   MIN,
   MAX,
-  COUNT_ALL
+  COUNT_ALL,
+  APPROX_COUNT_DISTINCT
 }
 
 struct TShowConfigurationTemplateResp {

Reply via email to