This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 59dac3272a6 [TABLE MODEL] Support user-defined aggregate function
59dac3272a6 is described below

commit 59dac3272a6433af0f4bd3c0e01702366aeb4edf
Author: Chen YZ <[email protected]>
AuthorDate: Wed Dec 18 08:37:15 2024 +0800

    [TABLE MODEL] Support user-defined aggregate function
---
 .../apache/iotdb/udf/AggregateFunctionExample.java | 117 ++++
 .../query/udf/example/relational/FirstTwoSum.java  | 168 +++++
 .../db/query/udf/example/relational/MyAvg.java     | 126 ++++
 .../db/query/udf/example/relational/MyCount.java   |  95 +++
 .../it/db/it/udf/IoTDBSQLFunctionManagementIT.java |  58 +-
 .../udf/IoTDBUserDefinedAggregateFunctionIT.java   | 690 +++++++++++++++++++++
 ....java => IoTDBUserDefinedScalarFunctionIT.java} |   8 +-
 .../relational/it/db/it/udf/SQLFunctionUtils.java  |  87 +++
 .../customizer/config/AggregateFunctionConfig.java |  53 ++
 .../udf/api/relational/AggregateFunction.java      |  81 ++-
 .../iotdb/udf/api/relational/access/Record.java    |   3 +-
 .../relational/aggregation/AccumulatorFactory.java |  38 +-
 .../UserDefinedAggregateFunctionAccumulator.java   | 123 ++++
 .../GroupedUserDefinedAggregateAccumulator.java    | 134 ++++
 .../relational/analyzer/ExpressionTreeUtils.java   |   5 +-
 .../relational/metadata/TableMetadataImpl.java     |  26 +-
 .../relational/planner/optimizations/Util.java     |   2 -
 .../iotdb/commons/udf/access/RecordIterator.java   |  10 +-
 .../TableBuiltinAggregationFunction.java           |   4 +-
 .../iotdb/commons/udf/utils/TableUDFUtils.java     |   6 +-
 .../commons/udf/utils/UDFDataTypeTransformer.java  |  45 +-
 21 files changed, 1819 insertions(+), 60 deletions(-)

diff --git 
a/example/udf/src/main/java/org/apache/iotdb/udf/AggregateFunctionExample.java 
b/example/udf/src/main/java/org/apache/iotdb/udf/AggregateFunctionExample.java
new file mode 100644
index 00000000000..62ed28956e1
--- /dev/null
+++ 
b/example/udf/src/main/java/org/apache/iotdb/udf/AggregateFunctionExample.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.udf;
+
+import org.apache.iotdb.udf.api.State;
+import org.apache.iotdb.udf.api.customizer.config.AggregateFunctionConfig;
+import org.apache.iotdb.udf.api.customizer.parameter.FunctionParameters;
+import org.apache.iotdb.udf.api.exception.UDFException;
+import org.apache.iotdb.udf.api.exception.UDFParameterNotValidException;
+import org.apache.iotdb.udf.api.relational.AggregateFunction;
+import org.apache.iotdb.udf.api.relational.access.Record;
+import org.apache.iotdb.udf.api.type.Type;
+import org.apache.iotdb.udf.api.utils.ResultValue;
+
+import java.nio.ByteBuffer;
+
+/**
+ * This is an internal example of the AggregateFunction implementation.
+ *
+ * <p>CREATE DATABASE test;
+ *
+ * <p>USE test;
+ *
+ * <p>CREATE TABLE t1(device_id STRING ID, s1 TEXT MEASUREMENT, s2 INT32 
MEASUREMENT);
+ *
+ * <p>INSERT INTO t1(time, device_id, s1, s2) VALUES (1, 'd1', 'a', 1), (2, 
'd1', null, 2), (3,
+ * 'd2', 'c', null);
+ *
+ * <p>CREATE FUNCTION my_count AS 
'org.apache.iotdb.udf.AggregateFunctionExample';
+ *
+ * <p>SHOW FUNCTIONS;
+ *
+ * <p>SELECT time, device_id, my_count(s1) as s1_count, my_count(s2) as 
s2_count FROM t1 group by
+ * device_id;
+ *
+ * <p>SELECT time, my_count(s1) as s1_count, my_count(s2) as s2_count FROM t1;
+ */
+public class AggregateFunctionExample implements AggregateFunction {
+
+  static class CountState implements State {
+
+    long count;
+
+    @Override
+    public void reset() {
+      count = 0;
+    }
+
+    @Override
+    public byte[] serialize() {
+      ByteBuffer buffer = ByteBuffer.allocate(Double.BYTES);
+      buffer.putLong(count);
+      return buffer.array();
+    }
+
+    @Override
+    public void deserialize(byte[] bytes) {
+      ByteBuffer buffer = ByteBuffer.wrap(bytes);
+      count = buffer.getLong();
+    }
+  }
+
+  @Override
+  public void validate(FunctionParameters parameters) throws UDFException {
+    if (parameters.getChildExpressionsSize() != 1) {
+      throw new UDFParameterNotValidException("Only one parameter is 
required.");
+    }
+  }
+
+  @Override
+  public void beforeStart(FunctionParameters parameters, 
AggregateFunctionConfig configurations) {
+    configurations.setOutputDataType(Type.INT64);
+  }
+
+  @Override
+  public State createState() {
+    return new CountState();
+  }
+
+  @Override
+  public void addInput(State state, Record input) {
+    CountState countState = (CountState) state;
+    if (!input.isNull(0)) {
+      countState.count++;
+    }
+  }
+
+  @Override
+  public void combineState(State state, State rhs) {
+    CountState countState = (CountState) state;
+    CountState rhsCountState = (CountState) rhs;
+    countState.count += rhsCountState.count;
+  }
+
+  @Override
+  public void outputFinal(State state, ResultValue resultValue) {
+    CountState countState = (CountState) state;
+    resultValue.setLong(countState.count);
+  }
+}
diff --git 
a/integration-test/src/main/java/org/apache/iotdb/db/query/udf/example/relational/FirstTwoSum.java
 
b/integration-test/src/main/java/org/apache/iotdb/db/query/udf/example/relational/FirstTwoSum.java
new file mode 100644
index 00000000000..041f3ca2c89
--- /dev/null
+++ 
b/integration-test/src/main/java/org/apache/iotdb/db/query/udf/example/relational/FirstTwoSum.java
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.query.udf.example.relational;
+
+import org.apache.iotdb.udf.api.State;
+import org.apache.iotdb.udf.api.customizer.config.AggregateFunctionConfig;
+import org.apache.iotdb.udf.api.customizer.parameter.FunctionParameters;
+import org.apache.iotdb.udf.api.exception.UDFException;
+import org.apache.iotdb.udf.api.relational.AggregateFunction;
+import org.apache.iotdb.udf.api.relational.access.Record;
+import org.apache.iotdb.udf.api.type.Type;
+import org.apache.iotdb.udf.api.utils.ResultValue;
+
+import java.nio.ByteBuffer;
+
+public class FirstTwoSum implements AggregateFunction {
+
+  static class FirstTwoSumState implements State {
+    long firstTime = Long.MAX_VALUE;
+    long secondTime = Long.MAX_VALUE;
+    double firstValue;
+    double secondValue;
+
+    @Override
+    public void reset() {
+      firstTime = Long.MAX_VALUE;
+      secondTime = Long.MAX_VALUE;
+      firstValue = 0;
+      secondValue = 0;
+    }
+
+    @Override
+    public byte[] serialize() {
+      ByteBuffer buffer = ByteBuffer.allocate(Long.BYTES * 2 + Double.BYTES * 
2);
+      buffer.putLong(firstTime);
+      buffer.putLong(secondTime);
+      buffer.putDouble(firstValue);
+      buffer.putDouble(secondValue);
+      return buffer.array();
+    }
+
+    @Override
+    public void deserialize(byte[] bytes) {
+      ByteBuffer buffer = ByteBuffer.wrap(bytes);
+      firstTime = buffer.getLong();
+      secondTime = buffer.getLong();
+      firstValue = buffer.getDouble();
+      secondValue = buffer.getDouble();
+    }
+  }
+
+  @Override
+  public void validate(FunctionParameters parameters) throws UDFException {
+    if (parameters.getChildExpressionsSize() != 3) {
+      throw new UDFException("FirstTwoSum should accept three column as 
input");
+    }
+    for (int i = 0; i < 2; i++) {
+      if (parameters.getDataType(i) != Type.INT32
+          && parameters.getDataType(i) != Type.INT64
+          && parameters.getDataType(i) != Type.FLOAT
+          && parameters.getDataType(i) != Type.DOUBLE) {
+        throw new UDFException(
+            "FirstTwoSum should accept INT32, INT64, FLOAT, DOUBLE as the 
first two inputs");
+      }
+    }
+    if (parameters.getDataType(2) != Type.TIMESTAMP) {
+      throw new UDFException("FirstTwoSum should accept TIMESTAMP as the third 
input");
+    }
+  }
+
+  @Override
+  public void beforeStart(FunctionParameters parameters, 
AggregateFunctionConfig configurations) {
+    configurations.setOutputDataType(Type.DOUBLE);
+  }
+
+  @Override
+  public State createState() {
+    return new FirstTwoSumState();
+  }
+
+  @Override
+  public void addInput(State state, Record input) {
+    FirstTwoSumState firstTwoSumState = (FirstTwoSumState) state;
+    long time = input.getLong(2);
+    if (!input.isNull(0) && time < firstTwoSumState.firstTime) {
+      firstTwoSumState.firstTime = time;
+      switch (input.getDataType(0)) {
+        case INT32:
+          firstTwoSumState.firstValue = input.getInt(0);
+          break;
+        case INT64:
+          firstTwoSumState.firstValue = input.getLong(0);
+          break;
+        case FLOAT:
+          firstTwoSumState.firstValue = input.getFloat(0);
+          break;
+        case DOUBLE:
+          firstTwoSumState.firstValue = input.getDouble(0);
+          break;
+        default:
+          throw new UDFException(
+              "FirstTwoSum should accept INT32, INT64, FLOAT, DOUBLE as the 
first two inputs");
+      }
+    }
+    if (!input.isNull(1) && time < firstTwoSumState.secondTime) {
+      firstTwoSumState.secondTime = time;
+      switch (input.getDataType(1)) {
+        case INT32:
+          firstTwoSumState.secondValue = input.getInt(1);
+          break;
+        case INT64:
+          firstTwoSumState.secondValue = input.getLong(1);
+          break;
+        case FLOAT:
+          firstTwoSumState.secondValue = input.getFloat(1);
+          break;
+        case DOUBLE:
+          firstTwoSumState.secondValue = input.getDouble(1);
+          break;
+        default:
+          throw new UDFException(
+              "FirstTwoSum should accept INT32, INT64, FLOAT, DOUBLE as the 
first two inputs");
+      }
+    }
+  }
+
+  @Override
+  public void combineState(State state, State rhs) {
+    FirstTwoSumState firstTwoSumState = (FirstTwoSumState) state;
+    FirstTwoSumState rhsState = (FirstTwoSumState) rhs;
+    if (rhsState.firstTime < firstTwoSumState.firstTime) {
+      firstTwoSumState.firstTime = rhsState.firstTime;
+      firstTwoSumState.firstValue = rhsState.firstValue;
+    }
+    if (rhsState.secondTime < firstTwoSumState.secondTime) {
+      firstTwoSumState.secondTime = rhsState.secondTime;
+      firstTwoSumState.secondValue = rhsState.secondValue;
+    }
+  }
+
+  @Override
+  public void outputFinal(State state, ResultValue resultValue) {
+    FirstTwoSumState firstTwoSumState = (FirstTwoSumState) state;
+    if (firstTwoSumState.firstTime == Long.MAX_VALUE
+        && firstTwoSumState.secondTime == Long.MAX_VALUE) {
+      resultValue.setNull();
+    } else {
+      resultValue.setDouble(firstTwoSumState.firstValue + 
firstTwoSumState.secondValue);
+    }
+  }
+}
diff --git 
a/integration-test/src/main/java/org/apache/iotdb/db/query/udf/example/relational/MyAvg.java
 
b/integration-test/src/main/java/org/apache/iotdb/db/query/udf/example/relational/MyAvg.java
new file mode 100644
index 00000000000..e8a41e97296
--- /dev/null
+++ 
b/integration-test/src/main/java/org/apache/iotdb/db/query/udf/example/relational/MyAvg.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.query.udf.example.relational;
+
+import org.apache.iotdb.udf.api.State;
+import org.apache.iotdb.udf.api.customizer.config.AggregateFunctionConfig;
+import org.apache.iotdb.udf.api.customizer.parameter.FunctionParameters;
+import org.apache.iotdb.udf.api.exception.UDFException;
+import org.apache.iotdb.udf.api.relational.AggregateFunction;
+import org.apache.iotdb.udf.api.relational.access.Record;
+import org.apache.iotdb.udf.api.type.Type;
+import org.apache.iotdb.udf.api.utils.ResultValue;
+
+import java.nio.ByteBuffer;
+
+public class MyAvg implements AggregateFunction {
+
+  static class AvgState implements State {
+    double sum;
+    long count;
+
+    @Override
+    public void reset() {
+      sum = 0;
+      count = 0;
+    }
+
+    @Override
+    public byte[] serialize() {
+      ByteBuffer buffer = ByteBuffer.allocate(Double.BYTES + Long.BYTES);
+      buffer.putDouble(sum);
+      buffer.putLong(count);
+
+      return buffer.array();
+    }
+
+    @Override
+    public void deserialize(byte[] bytes) {
+      ByteBuffer buffer = ByteBuffer.wrap(bytes);
+      sum = buffer.getDouble();
+      count = buffer.getLong();
+    }
+  }
+
+  @Override
+  public void validate(FunctionParameters parameters) throws UDFException {
+    if (parameters.getChildExpressionsSize() != 1) {
+      throw new UDFException("MyAvg only accepts one column as input");
+    }
+    if (parameters.getDataType(0) != Type.INT32
+        && parameters.getDataType(0) != Type.INT64
+        && parameters.getDataType(0) != Type.FLOAT
+        && parameters.getDataType(0) != Type.DOUBLE) {
+      throw new UDFException("MyAvg only accepts INT32, INT64, FLOAT, DOUBLE 
as input");
+    }
+  }
+
+  @Override
+  public void beforeStart(FunctionParameters parameters, 
AggregateFunctionConfig configurations) {
+    configurations.setOutputDataType(Type.DOUBLE);
+  }
+
+  @Override
+  public State createState() {
+    return new AvgState();
+  }
+
+  @Override
+  public void addInput(State state, Record input) {
+    if (!input.isNull(0)) {
+      AvgState avgState = (AvgState) state;
+      switch (input.getDataType(0)) {
+        case INT32:
+          avgState.sum += input.getInt(0);
+          break;
+        case INT64:
+          avgState.sum += input.getLong(0);
+          break;
+        case FLOAT:
+          avgState.sum += input.getFloat(0);
+          break;
+        case DOUBLE:
+          avgState.sum += input.getDouble(0);
+          break;
+        default:
+          throw new UDFException("MyAvg only accepts INT32, INT64, FLOAT, 
DOUBLE as input");
+      }
+      avgState.count++;
+    }
+  }
+
+  @Override
+  public void combineState(State state, State rhs) {
+    AvgState avgState = (AvgState) state;
+    AvgState avgRhs = (AvgState) rhs;
+    avgState.sum += avgRhs.sum;
+    avgState.count += avgRhs.count;
+  }
+
+  @Override
+  public void outputFinal(State state, ResultValue resultValue) {
+    AvgState avgState = (AvgState) state;
+    if (avgState.count != 0) {
+      resultValue.setDouble(avgState.sum / avgState.count);
+    } else {
+      resultValue.setNull();
+    }
+  }
+}
diff --git 
a/integration-test/src/main/java/org/apache/iotdb/db/query/udf/example/relational/MyCount.java
 
b/integration-test/src/main/java/org/apache/iotdb/db/query/udf/example/relational/MyCount.java
new file mode 100644
index 00000000000..de9436bfd86
--- /dev/null
+++ 
b/integration-test/src/main/java/org/apache/iotdb/db/query/udf/example/relational/MyCount.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.query.udf.example.relational;
+
+import org.apache.iotdb.udf.api.State;
+import org.apache.iotdb.udf.api.customizer.config.AggregateFunctionConfig;
+import org.apache.iotdb.udf.api.customizer.parameter.FunctionParameters;
+import org.apache.iotdb.udf.api.exception.UDFException;
+import org.apache.iotdb.udf.api.relational.AggregateFunction;
+import org.apache.iotdb.udf.api.relational.access.Record;
+import org.apache.iotdb.udf.api.type.Type;
+import org.apache.iotdb.udf.api.utils.ResultValue;
+
+import java.nio.ByteBuffer;
+
+public class MyCount implements AggregateFunction {
+
+  static class CountState implements State {
+    long count;
+
+    @Override
+    public void reset() {
+      count = 0;
+    }
+
+    @Override
+    public byte[] serialize() {
+      ByteBuffer buffer = ByteBuffer.allocate(Long.BYTES);
+      buffer.putLong(count);
+
+      return buffer.array();
+    }
+
+    @Override
+    public void deserialize(byte[] bytes) {
+      ByteBuffer buffer = ByteBuffer.wrap(bytes);
+      count = buffer.getLong();
+    }
+  }
+
+  @Override
+  public void validate(FunctionParameters parameters) throws UDFException {
+    if (parameters.getChildExpressionsSize() == 0) {
+      throw new UDFException("MyCount accepts at least one parameter");
+    }
+  }
+
+  @Override
+  public void beforeStart(FunctionParameters parameters, 
AggregateFunctionConfig configurations) {
+    configurations.setOutputDataType(Type.INT64);
+  }
+
+  @Override
+  public State createState() {
+    return new CountState();
+  }
+
+  @Override
+  public void addInput(State state, Record input) {
+    CountState countState = (CountState) state;
+    for (int i = 0; i < input.size(); i++) {
+      if (!input.isNull(i)) {
+        countState.count++;
+        break;
+      }
+    }
+  }
+
+  @Override
+  public void combineState(State state, State rhs) {
+    ((CountState) state).count += ((CountState) rhs).count;
+  }
+
+  @Override
+  public void outputFinal(State state, ResultValue resultValue) {
+    resultValue.setLong(((CountState) state).count);
+  }
+}
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/relational/it/db/it/udf/IoTDBSQLFunctionManagementIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/relational/it/db/it/udf/IoTDBSQLFunctionManagementIT.java
index 59824c1c6e8..93fc3070e67 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/relational/it/db/it/udf/IoTDBSQLFunctionManagementIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/relational/it/db/it/udf/IoTDBSQLFunctionManagementIT.java
@@ -38,6 +38,7 @@ import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.sql.Statement;
 
+import static 
org.apache.iotdb.commons.conf.IoTDBConstant.FUNCTION_TYPE_USER_DEFINED_AGG_FUNC;
 import static 
org.apache.iotdb.commons.conf.IoTDBConstant.FUNCTION_TYPE_USER_DEFINED_SCALAR_FUNC;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
@@ -88,10 +89,10 @@ public class IoTDBSQLFunctionManagementIT {
             stringBuilder.append(resultSet.getString(i)).append(",");
           }
           String result = stringBuilder.toString();
-          if (result.contains("FUNCTION_TYPE_USER_DEFINED_SCALAR_FUNC")) {
+          if (result.contains(FUNCTION_TYPE_USER_DEFINED_SCALAR_FUNC)) {
             Assert.assertEquals(
                 String.format(
-                    
"udsf,%s,org.apache.iotdb.db.query.udf.example.relational.ContainNull,AVAILABLE,",
+                    
"UDSF,%s,org.apache.iotdb.db.query.udf.example.relational.ContainNull,AVAILABLE,",
                     FUNCTION_TYPE_USER_DEFINED_SCALAR_FUNC),
                 result);
           }
@@ -110,7 +111,58 @@ public class IoTDBSQLFunctionManagementIT {
             stringBuilder.append(resultSet.getString(i)).append(",");
           }
           String result = stringBuilder.toString();
-          if (result.contains("FUNCTION_TYPE_USER_DEFINED_SCALAR_FUNC")) {
+          if (result.contains(FUNCTION_TYPE_USER_DEFINED_SCALAR_FUNC)) {
+            Assert.fail();
+          }
+          ++count;
+        }
+        Assert.assertEquals(
+            BUILTIN_AGGREGATE_FUNCTIONS_COUNT + 
BUILTIN_SCALAR_FUNCTIONS_COUNT, count);
+      }
+    } catch (SQLException throwable) {
+      fail(throwable.getMessage());
+    }
+  }
+
+  @Test
+  public void testCreateShowDropAggregateFunction() {
+    try (Connection connection = EnvFactory.getEnv().getTableConnection();
+        Statement statement = connection.createStatement()) {
+      statement.execute(
+          "create function udaf as 
'org.apache.iotdb.db.query.udf.example.relational.MyCount'");
+
+      try (ResultSet resultSet = statement.executeQuery("show functions")) {
+        assertEquals(4, resultSet.getMetaData().getColumnCount());
+        int count = 0;
+        while (resultSet.next()) {
+          StringBuilder stringBuilder = new StringBuilder();
+          for (int i = 1; i <= resultSet.getMetaData().getColumnCount(); ++i) {
+            stringBuilder.append(resultSet.getString(i)).append(",");
+          }
+          String result = stringBuilder.toString();
+          if (result.contains(FUNCTION_TYPE_USER_DEFINED_AGG_FUNC)) {
+            Assert.assertEquals(
+                String.format(
+                    
"UDAF,%s,org.apache.iotdb.db.query.udf.example.relational.MyCount,AVAILABLE,",
+                    FUNCTION_TYPE_USER_DEFINED_AGG_FUNC),
+                result);
+          }
+          ++count;
+        }
+        Assert.assertEquals(
+            1 + BUILTIN_AGGREGATE_FUNCTIONS_COUNT + 
BUILTIN_SCALAR_FUNCTIONS_COUNT, count);
+      }
+      statement.execute("drop function udaf");
+      try (ResultSet resultSet = statement.executeQuery("show functions")) {
+        assertEquals(4, resultSet.getMetaData().getColumnCount());
+        int count = 0;
+        while (resultSet.next()) {
+          StringBuilder stringBuilder = new StringBuilder();
+          for (int i = 1; i <= resultSet.getMetaData().getColumnCount(); ++i) {
+            stringBuilder.append(resultSet.getString(i)).append(",");
+          }
+          String result = stringBuilder.toString();
+          if (result.contains(FUNCTION_TYPE_USER_DEFINED_AGG_FUNC)) {
             Assert.fail();
           }
           ++count;
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/relational/it/db/it/udf/IoTDBUserDefinedAggregateFunctionIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/relational/it/db/it/udf/IoTDBUserDefinedAggregateFunctionIT.java
new file mode 100644
index 00000000000..0a99caff34f
--- /dev/null
+++ 
b/integration-test/src/test/java/org/apache/iotdb/relational/it/db/it/udf/IoTDBUserDefinedAggregateFunctionIT.java
@@ -0,0 +1,690 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.relational.it.db.it.udf;
+
+import org.apache.iotdb.it.env.EnvFactory;
+import org.apache.iotdb.it.framework.IoTDBTestRunner;
+import org.apache.iotdb.itbase.category.TableClusterIT;
+import org.apache.iotdb.itbase.category.TableLocalStandaloneIT;
+
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+
+import java.sql.Connection;
+import java.sql.Statement;
+
+import static org.apache.iotdb.db.it.utils.TestUtils.tableAssertTestFail;
+import static org.apache.iotdb.db.it.utils.TestUtils.tableResultSetEqualTest;
+import static org.junit.Assert.fail;
+
+@RunWith(IoTDBTestRunner.class)
+@Category({TableLocalStandaloneIT.class, TableClusterIT.class})
+public class IoTDBUserDefinedAggregateFunctionIT {
+  private static final String DATABASE_NAME = "test";
+  private static final String[] sqls =
+      new String[] {
+        "CREATE DATABASE " + DATABASE_NAME,
+        "USE " + DATABASE_NAME,
+        "CREATE TABLE table1(province STRING ID, city STRING ID, region STRING 
ID, device_id STRING ID, color STRING ATTRIBUTE, type STRING ATTRIBUTE, s1 
INT32 MEASUREMENT, s2 INT64 MEASUREMENT, s3 FLOAT MEASUREMENT, s4 DOUBLE 
MEASUREMENT, s5 BOOLEAN MEASUREMENT, s6 TEXT MEASUREMENT, s7 STRING 
MEASUREMENT, s8 BLOB MEASUREMENT, s9 TIMESTAMP MEASUREMENT, s10 DATE 
MEASUREMENT)",
+        "INSERT INTO 
table1(time,province,city,region,device_id,color,type,s1,s3,s6,s8,s9) values 
(2024-09-24T06:15:30.000+00:00,'shanghai','shanghai','huangpu','d01','red','A',30,30.0,'shanghai_huangpu_red_A_d01_30',
 X'cafebabe30',2024-09-24T06:15:30.000+00:00)",
+        "INSERT INTO 
table1(time,province,city,region,device_id,color,type,s2,s3,s4,s6,s7,s9,s10) 
values 
(2024-09-24T06:15:35.000+00:00,'shanghai','shanghai','huangpu','d01','red','A',35000,35.0,35.0,'shanghai_huangpu_red_A_d01_35','shanghai_huangpu_red_A_d01_35',2024-09-24T06:15:35.000+00:00,'2024-09-24')",
+        "INSERT INTO 
table1(time,province,city,region,device_id,color,type,s1,s3,s5,s7,s9) values 
(2024-09-24T06:15:40.000+00:00,'shanghai','shanghai','huangpu','d01','red','A',40,40.0,true,'shanghai_huangpu_red_A_d01_40',2024-09-24T06:15:40.000+00:00)",
+        "INSERT INTO 
table1(time,province,city,region,device_id,color,type,s2,s5,s9,s10) values 
(2024-09-24T06:15:50.000+00:00,'shanghai','shanghai','huangpu','d01','red','A',50000,false,2024-09-24T06:15:50.000+00:00,'2024-09-24')",
+        "INSERT INTO 
table1(time,province,city,region,device_id,color,type,s1,s4,s8,s9) values 
(2024-09-24T06:15:55.000+00:00,'shanghai','shanghai','huangpu','d01','red','A',55,55.0,X'cafebabe55',2024-09-24T06:15:55.000+00:00)",
+        "INSERT INTO 
table1(time,province,city,region,device_id,color,type,s1,s5,s6,s7,s9) values 
(2024-09-24T06:15:36.000+00:00,'shanghai','shanghai','huangpu','d02','red','BBBBBBBBBBBBBBBB',36,true,'shanghai_huangpu_red_B_d02_36','shanghai_huangpu_red_B_d02_36',2024-09-24T06:15:36.000+00:00)",
+        "INSERT INTO 
table1(time,province,city,region,device_id,color,type,s1,s4,s7,s9,s10) values 
(2024-09-24T06:15:40.000+00:00,'shanghai','shanghai','huangpu','d02','red','BBBBBBBBBBBBBBBB',40,40.0,'shanghai_huangpu_red_B_d02_40',2024-09-24T06:15:40.000+00:00,'2024-09-24')",
+        "INSERT INTO 
table1(time,province,city,region,device_id,color,type,s2,s7,s8,s9) values 
(2024-09-24T06:15:50.000+00:00,'shanghai','shanghai','huangpu','d02','red','BBBBBBBBBBBBBBBB',50000,'shanghai_huangpu_red_B_d02_50',X'cafebabe50',2024-09-24T06:15:50.000+00:00)",
+        "INSERT INTO 
table1(time,province,city,region,device_id,color,type,s2,s8,s9) values 
(2024-09-24T06:15:31.000+00:00,'shanghai','shanghai','huangpu','d03','yellow','A',31000,X'cafebabe31',2024-09-24T06:15:31.000+00:00)",
+        "INSERT INTO 
table1(time,province,city,region,device_id,color,type,s1,s4,s7,s9,s10) values 
(2024-09-24T06:15:36.000+00:00,'shanghai','shanghai','huangpu','d03','yellow','A',36,36.0,'shanghai_huangpu_yellow_A_d03_36',2024-09-24T06:15:36.000+00:00,'2024-09-24')",
+        "INSERT INTO 
table1(time,province,city,region,device_id,color,type,s1,s3,s5,s6,s8,s9) values 
(2024-09-24T06:15:41.000+00:00,'shanghai','shanghai','huangpu','d03','yellow','A',41,41.0,false,'shanghai_huangpu_yellow_A_d03_41',X'cafebabe41',2024-09-24T06:15:41.000+00:00)",
+        "INSERT INTO 
table1(time,province,city,region,device_id,color,type,s2,s4,s7,s9) values 
(2024-09-24T06:15:46.000+00:00,'shanghai','shanghai','huangpu','d03','yellow','A',46000,46.0,'shanghai_huangpu_yellow_A_d03_46',2024-09-24T06:15:46.000+00:00)",
+        "INSERT INTO 
table1(time,province,city,region,device_id,color,type,s3,s6,s9) values 
(2024-09-24T06:15:51.000+00:00,'shanghai','shanghai','huangpu','d03','yellow','A',51.0,'shanghai_huangpu_yellow_A_d03_51',2024-09-24T06:15:51.000+00:00)",
+        "INSERT INTO 
table1(time,province,city,region,device_id,color,type,s3,s5,s7,s9,s10) values 
(2024-09-24T06:15:30.000+00:00,'shanghai','shanghai','huangpu','d04','yellow','BBBBBBBBBBBBBBBB',30.0,true,'shanghai_huangpu_yellow_B_d04_30',2024-09-24T06:15:30.000+00:00,'2024-09-24')",
+        "INSERT INTO 
table1(time,province,city,region,device_id,color,type,s2,s9) values 
(2024-09-24T06:15:40.000+00:00,'shanghai','shanghai','huangpu','d04','yellow','BBBBBBBBBBBBBBBB',40000,2024-09-24T06:15:40.000+00:00)",
+        "INSERT INTO 
table1(time,province,city,region,device_id,color,type,s1,s4,s6,s8,s9) values 
(2024-09-24T06:15:55.000+00:00,'shanghai','shanghai','huangpu','d04','yellow','BBBBBBBBBBBBBBBB',55,55.0,'shanghai_huangpu_yellow_B_d04_55',X'cafebabe55',2024-09-24T06:15:55.000+00:00)",
+        "INSERT INTO 
table1(time,province,city,region,device_id,color,type,s1,s3,s6,s8,s9) values 
(2024-09-24T06:15:30.000+00:00,'shanghai','shanghai','pudong','d05','red','A',30,30.0,'shanghai_pudong_red_A_d05_30',
 X'cafebabe30',2024-09-24T06:15:30.000+00:00)",
+        "INSERT INTO 
table1(time,province,city,region,device_id,color,type,s2,s3,s4,s6,s7,s9,s10) 
values 
(2024-09-24T06:15:35.000+00:00,'shanghai','shanghai','pudong','d05','red','A',35000,35.0,35.0,'shanghai_pudong_red_A_d05_35','shanghai_pudong_red_A_d05_35',2024-09-24T06:15:35.000+00:00,'2024-09-24')",
+        "INSERT INTO 
table1(time,province,city,region,device_id,color,type,s1,s3,s5,s7,s9) values 
(2024-09-24T06:15:40.000+00:00,'shanghai','shanghai','pudong','d05','red','A',40,40.0,true,'shanghai_pudong_red_A_d05_40',2024-09-24T06:15:40.000+00:00)",
+        "INSERT INTO 
table1(time,province,city,region,device_id,color,type,s2,s5,s9,s10) values 
(2024-09-24T06:15:50.000+00:00,'shanghai','shanghai','pudong','d05','red','A',50000,false,2024-09-24T06:15:50.000+00:00,'2024-09-24')",
+        "INSERT INTO 
table1(time,province,city,region,device_id,color,type,s1,s4,s8,s9) values 
(2024-09-24T06:15:55.000+00:00,'shanghai','shanghai','pudong','d05','red','A',55,55.0,X'cafebabe55',2024-09-24T06:15:55.000+00:00)",
+        "INSERT INTO 
table1(time,province,city,region,device_id,color,type,s1,s5,s6,s7,s9) values 
(2024-09-24T06:15:36.000+00:00,'shanghai','shanghai','pudong','d06','red','BBBBBBBBBBBBBBBB',36,true,'shanghai_pudong_red_B_d06_36','shanghai_pudong_red_B_d06_36',2024-09-24T06:15:36.000+00:00)",
+        "INSERT INTO 
table1(time,province,city,region,device_id,color,type,s1,s4,s7,s9,s10) values 
(2024-09-24T06:15:40.000+00:00,'shanghai','shanghai','pudong','d06','red','BBBBBBBBBBBBBBBB',40,40.0,'shanghai_pudong_red_B_d06_40',2024-09-24T06:15:40.000+00:00,'2024-09-24')",
+        "INSERT INTO 
table1(time,province,city,region,device_id,color,type,s2,s7,s8,s9) values 
(2024-09-24T06:15:50.000+00:00,'shanghai','shanghai','pudong','d06','red','BBBBBBBBBBBBBBBB',50000,'shanghai_pudong_red_B_d06_50',X'cafebabe50',2024-09-24T06:15:50.000+00:00)",
+        "INSERT INTO 
table1(time,province,city,region,device_id,color,type,s2,s8,s9) values 
(2024-09-24T06:15:31.000+00:00,'shanghai','shanghai','pudong','d07','yellow','A',31000,X'cafebabe31',2024-09-24T06:15:31.000+00:00)",
+        "INSERT INTO 
table1(time,province,city,region,device_id,color,type,s1,s4,s7,s9,s10) values 
(2024-09-24T06:15:36.000+00:00,'shanghai','shanghai','pudong','d07','yellow','A',36,36.0,'shanghai_pudong_yellow_A_d07_36',2024-09-24T06:15:36.000+00:00,'2024-09-24')",
+        "INSERT INTO 
table1(time,province,city,region,device_id,color,type,s1,s3,s5,s6,s8,s9) values 
(2024-09-24T06:15:41.000+00:00,'shanghai','shanghai','pudong','d07','yellow','A',41,41.0,false,'shanghai_pudong_yellow_A_d07_41',X'cafebabe41',2024-09-24T06:15:41.000+00:00)",
+        "INSERT INTO 
table1(time,province,city,region,device_id,color,type,s2,s4,s7,s9) values 
(2024-09-24T06:15:46.000+00:00,'shanghai','shanghai','pudong','d07','yellow','A',46000,46.0,'shanghai_pudong_yellow_A_d07_46',2024-09-24T06:15:46.000+00:00)",
+        "INSERT INTO 
table1(time,province,city,region,device_id,color,type,s3,s6,s9) values 
(2024-09-24T06:15:51.000+00:00,'shanghai','shanghai','pudong','d07','yellow','A',51.0,'shanghai_pudong_yellow_A_d07_51',2024-09-24T06:15:51.000+00:00)",
+        "INSERT INTO 
table1(time,province,city,region,device_id,color,type,s3,s5,s7,s9,s10) values 
(2024-09-24T06:15:30.000+00:00,'shanghai','shanghai','pudong','d08','yellow','BBBBBBBBBBBBBBBB',30.0,true,'shanghai_pudong_yellow_B_d08_30',2024-09-24T06:15:30.000+00:00,'2024-09-24')",
+        "INSERT INTO 
table1(time,province,city,region,device_id,color,type,s2,s9) values 
(2024-09-24T06:15:40.000+00:00,'shanghai','shanghai','pudong','d08','yellow','BBBBBBBBBBBBBBBB',40000,2024-09-24T06:15:40.000+00:00)",
+        "INSERT INTO 
table1(time,province,city,region,device_id,color,type,s1,s4,s6,s8,s9) values 
(2024-09-24T06:15:55.000+00:00,'shanghai','shanghai','pudong','d08','yellow','BBBBBBBBBBBBBBBB',55,55.0,'shanghai_pudong_yellow_B_d08_55',X'cafebabe55',2024-09-24T06:15:55.000+00:00)",
+        "INSERT INTO 
table1(time,province,city,region,device_id,color,type,s1,s3,s6,s8,s9) values 
(2024-09-24T06:15:30.000+00:00,'beijing','beijing','chaoyang','d09','red','A',30,30.0,'beijing_chaoyang_red_A_d09_30',
 X'cafebabe30',2024-09-24T06:15:30.000+00:00)",
+        "INSERT INTO 
table1(time,province,city,region,device_id,color,type,s2,s3,s4,s6,s7,s9,s10) 
values 
(2024-09-24T06:15:35.000+00:00,'beijing','beijing','chaoyang','d09','red','A',35000,35.0,35.0,'beijing_chaoyang_red_A_d09_35','beijing_chaoyang_red_A_d09_35',2024-09-24T06:15:35.000+00:00,'2024-09-24')",
+        "INSERT INTO 
table1(time,province,city,region,device_id,color,type,s1,s3,s5,s7,s9) values 
(2024-09-24T06:15:40.000+00:00,'beijing','beijing','chaoyang','d09','red','A',40,40.0,true,'beijing_chaoyang_red_A_d09_40',2024-09-24T06:15:40.000+00:00)",
+        "INSERT INTO 
table1(time,province,city,region,device_id,color,type,s2,s5,s9,s10) values 
(2024-09-24T06:15:50.000+00:00,'beijing','beijing','chaoyang','d09','red','A',50000,false,2024-09-24T06:15:50.000+00:00,'2024-09-24')",
+        "INSERT INTO 
table1(time,province,city,region,device_id,color,type,s1,s4,s8,s9) values 
(2024-09-24T06:15:55.000+00:00,'beijing','beijing','chaoyang','d09','red','A',55,55.0,X'cafebabe55',2024-09-24T06:15:55.000+00:00)",
+        "INSERT INTO 
table1(time,province,city,region,device_id,color,type,s1,s5,s6,s7,s9) values 
(2024-09-24T06:15:36.000+00:00,'beijing','beijing','chaoyang','d10','red','BBBBBBBBBBBBBBBB',36,true,'beijing_chaoyang_red_B_d10_36','beijing_chaoyang_red_B_d10_36',2024-09-24T06:15:36.000+00:00)",
+        "INSERT INTO 
table1(time,province,city,region,device_id,color,type,s1,s4,s7,s9,s10) values 
(2024-09-24T06:15:40.000+00:00,'beijing','beijing','chaoyang','d10','red','BBBBBBBBBBBBBBBB',40,40.0,'beijing_chaoyang_red_B_d10_40',2024-09-24T06:15:40.000+00:00,'2024-09-24')",
+        "INSERT INTO 
table1(time,province,city,region,device_id,color,type,s2,s7,s8,s9) values 
(2024-09-24T06:15:50.000+00:00,'beijing','beijing','chaoyang','d10','red','BBBBBBBBBBBBBBBB',50000,'beijing_chaoyang_red_B_d10_50',X'cafebabe50',2024-09-24T06:15:50.000+00:00)",
+        "INSERT INTO 
table1(time,province,city,region,device_id,color,type,s2,s8,s9) values 
(2024-09-24T06:15:31.000+00:00,'beijing','beijing','chaoyang','d11','yellow','A',31000,X'cafebabe31',2024-09-24T06:15:31.000+00:00)",
+        "INSERT INTO 
table1(time,province,city,region,device_id,color,type,s1,s4,s7,s9,s10) values 
(2024-09-24T06:15:36.000+00:00,'beijing','beijing','chaoyang','d11','yellow','A',36,36.0,'beijing_chaoyang_yellow_A_d11_36',2024-09-24T06:15:36.000+00:00,'2024-09-24')",
+        "INSERT INTO 
table1(time,province,city,region,device_id,color,type,s1,s3,s5,s6,s8,s9) values 
(2024-09-24T06:15:41.000+00:00,'beijing','beijing','chaoyang','d11','yellow','A',41,41.0,false,'beijing_chaoyang_yellow_A_d11_41',X'cafebabe41',2024-09-24T06:15:41.000+00:00)",
+        "INSERT INTO 
table1(time,province,city,region,device_id,color,type,s2,s4,s7,s9) values 
(2024-09-24T06:15:46.000+00:00,'beijing','beijing','chaoyang','d11','yellow','A',46000,46.0,'beijing_chaoyang_yellow_A_d11_46',2024-09-24T06:15:46.000+00:00)",
+        "INSERT INTO 
table1(time,province,city,region,device_id,color,type,s3,s6,s9) values 
(2024-09-24T06:15:51.000+00:00,'beijing','beijing','chaoyang','d11','yellow','A',51.0,'beijing_chaoyang_yellow_A_d11_51',2024-09-24T06:15:51.000+00:00)",
+        "INSERT INTO 
table1(time,province,city,region,device_id,color,type,s3,s5,s7,s9,s10) values 
(2024-09-24T06:15:30.000+00:00,'beijing','beijing','chaoyang','d12','yellow','BBBBBBBBBBBBBBBB',30.0,true,'beijing_chaoyang_yellow_B_d12_30',2024-09-24T06:15:30.000+00:00,'2024-09-24')",
+        "INSERT INTO 
table1(time,province,city,region,device_id,color,type,s2,s9) values 
(2024-09-24T06:15:40.000+00:00,'beijing','beijing','chaoyang','d12','yellow','BBBBBBBBBBBBBBBB',40000,2024-09-24T06:15:40.000+00:00)",
+        "INSERT INTO 
table1(time,province,city,region,device_id,color,type,s1,s4,s6,s8,s9) values 
(2024-09-24T06:15:55.000+00:00,'beijing','beijing','chaoyang','d12','yellow','BBBBBBBBBBBBBBBB',55,55.0,'beijing_chaoyang_yellow_B_d12_55',X'cafebabe55',2024-09-24T06:15:55.000+00:00)",
+        "INSERT INTO 
table1(time,province,city,region,device_id,color,type,s1,s3,s6,s8,s9) values 
(2024-09-24T06:15:30.000+00:00,'beijing','beijing','haidian','d13','red','A',30,30.0,'beijing_haidian_red_A_d13_30',
 X'cafebabe30',2024-09-24T06:15:30.000+00:00)",
+        "INSERT INTO 
table1(time,province,city,region,device_id,color,type,s2,s3,s4,s6,s7,s9,s10) 
values 
(2024-09-24T06:15:35.000+00:00,'beijing','beijing','haidian','d13','red','A',35000,35.0,35.0,'beijing_haidian_red_A_d13_35','beijing_haidian_red_A_d13_35',2024-09-24T06:15:35.000+00:00,'2024-09-24')",
+        "INSERT INTO 
table1(time,province,city,region,device_id,color,type,s1,s3,s5,s7,s9) values 
(2024-09-24T06:15:40.000+00:00,'beijing','beijing','haidian','d13','red','A',40,40.0,true,'beijing_haidian_red_A_d13_40',2024-09-24T06:15:40.000+00:00)",
+        "INSERT INTO 
table1(time,province,city,region,device_id,color,type,s2,s5,s9,s10) values 
(2024-09-24T06:15:50.000+00:00,'beijing','beijing','haidian','d13','red','A',50000,false,2024-09-24T06:15:50.000+00:00,'2024-09-24')",
+        "INSERT INTO 
table1(time,province,city,region,device_id,color,type,s1,s4,s8,s9) values 
(2024-09-24T06:15:55.000+00:00,'beijing','beijing','haidian','d13','red','A',55,55.0,X'cafebabe55',2024-09-24T06:15:55.000+00:00)",
+        "INSERT INTO 
table1(time,province,city,region,device_id,color,type,s1,s5,s6,s7,s9) values 
(2024-09-24T06:15:36.000+00:00,'beijing','beijing','haidian','d14','red','BBBBBBBBBBBBBBBB',36,true,'beijing_haidian_red_B_d14_36','beijing_haidian_red_B_d14_36',2024-09-24T06:15:36.000+00:00)",
+        "INSERT INTO 
table1(time,province,city,region,device_id,color,type,s1,s4,s7,s9,s10) values 
(2024-09-24T06:15:40.000+00:00,'beijing','beijing','haidian','d14','red','BBBBBBBBBBBBBBBB',40,40.0,'beijing_haidian_red_B_d14_40',2024-09-24T06:15:40.000+00:00,'2024-09-24')",
+        "INSERT INTO 
table1(time,province,city,region,device_id,color,type,s2,s7,s8,s9) values 
(2024-09-24T06:15:50.000+00:00,'beijing','beijing','haidian','d14','red','BBBBBBBBBBBBBBBB',50000,'beijing_haidian_red_B_d14_50',X'cafebabe50',2024-09-24T06:15:50.000+00:00)",
+        "INSERT INTO 
table1(time,province,city,region,device_id,color,type,s2,s8,s9) values 
(2024-09-24T06:15:31.000+00:00,'beijing','beijing','haidian','d15','yellow','A',31000,X'cafebabe31',2024-09-24T06:15:31.000+00:00)",
+        "INSERT INTO 
table1(time,province,city,region,device_id,color,type,s1,s4,s7,s9,s10) values 
(2024-09-24T06:15:36.000+00:00,'beijing','beijing','haidian','d15','yellow','A',36,36.0,'beijing_haidian_yellow_A_d15_36',2024-09-24T06:15:36.000+00:00,'2024-09-24')",
+        "INSERT INTO 
table1(time,province,city,region,device_id,color,type,s1,s3,s5,s6,s8,s9) values 
(2024-09-24T06:15:41.000+00:00,'beijing','beijing','haidian','d15','yellow','A',41,41.0,false,'beijing_haidian_yellow_A_d15_41',X'cafebabe41',2024-09-24T06:15:41.000+00:00)",
+        "INSERT INTO 
table1(time,province,city,region,device_id,color,type,s2,s4,s7,s9) values 
(2024-09-24T06:15:46.000+00:00,'beijing','beijing','haidian','d15','yellow','A',46000,46.0,'beijing_haidian_yellow_A_d15_46',2024-09-24T06:15:46.000+00:00)",
+        "INSERT INTO 
table1(time,province,city,region,device_id,color,type,s3,s6,s9) values 
(2024-09-24T06:15:51.000+00:00,'beijing','beijing','haidian','d15','yellow','A',51.0,'beijing_haidian_yellow_A_d15_51',2024-09-24T06:15:51.000+00:00)",
+        "INSERT INTO 
table1(time,province,city,region,device_id,color,type,s3,s5,s7,s9,s10) values 
(2024-09-24T06:15:30.000+00:00,'beijing','beijing','haidian','d16','yellow','BBBBBBBBBBBBBBBB',30.0,true,'beijing_haidian_yellow_B_d16_30',2024-09-24T06:15:30.000+00:00,'2024-09-24')",
+        "INSERT INTO 
table1(time,province,city,region,device_id,color,type,s2,s9) values 
(2024-09-24T06:15:40.000+00:00,'beijing','beijing','haidian','d16','yellow','BBBBBBBBBBBBBBBB',40000,2024-09-24T06:15:40.000+00:00)",
+        "INSERT INTO 
table1(time,province,city,region,device_id,color,type,s1,s4,s6,s8,s9) values 
(2024-09-24T06:15:55.000+00:00,'beijing','beijing','haidian','d16','yellow','BBBBBBBBBBBBBBBB',55,55.0,'beijing_haidian_yellow_B_d16_55',X'cafebabe55',2024-09-24T06:15:55.000+00:00)",
+        "FLUSH",
+        "CLEAR ATTRIBUTE CACHE",
+      };
+
+  @BeforeClass
+  public static void setUp() throws Exception {
+    EnvFactory.getEnv().initClusterEnvironment();
+    insertData();
+  }
+
+  @AfterClass
+  public static void tearDown() throws Exception {
+    EnvFactory.getEnv().cleanClusterEnvironment();
+  }
+
+  @After
+  public void dropFunction() {
+    SQLFunctionUtils.dropAllUDF();
+  }
+
+  private static void insertData() {
+    try (Connection connection = EnvFactory.getEnv().getTableConnection();
+        Statement statement = connection.createStatement()) {
+      for (String sql : sqls) {
+        System.out.println(sql + ";");
+        statement.execute(sql);
+      }
+    } catch (Exception e) {
+      fail("insertData failed.");
+    }
+  }
+
+  @Test
+  public void testMyCount() {
+    SQLFunctionUtils.createUDF(
+        "my_count", 
"org.apache.iotdb.db.query.udf.example.relational.MyCount");
+    String[] expectedHeader = new String[] {"_col0"};
+    String[] retArray =
+        new String[] {
+          "5,",
+        };
+    tableResultSetEqualTest(
+        "select my_count(time) from table1 where device_id = 'd01'",
+        expectedHeader,
+        retArray,
+        DATABASE_NAME);
+
+    expectedHeader = new String[] {"_col0", "end_time", "device_id", "_col3"};
+    retArray =
+        new String[] {
+          "2024-09-24T06:15:30.000Z,2024-09-24T06:15:35.000Z,d01,1,",
+          "2024-09-24T06:15:35.000Z,2024-09-24T06:15:40.000Z,d01,1,",
+          "2024-09-24T06:15:40.000Z,2024-09-24T06:15:45.000Z,d01,1,",
+          "2024-09-24T06:15:50.000Z,2024-09-24T06:15:55.000Z,d01,1,",
+          "2024-09-24T06:15:55.000Z,2024-09-24T06:16:00.000Z,d01,1,",
+        };
+    tableResultSetEqualTest(
+        "select date_bin(5s, time), (date_bin(5s, time) + 5000) as end_time, 
device_id, my_count(time) from table1 where device_id = 'd01' group by 
1,device_id",
+        expectedHeader,
+        retArray,
+        DATABASE_NAME);
+
+    expectedHeader = new String[] {"_col0", "province", "city", "region", 
"device_id", "_col5"};
+    retArray =
+        new String[] {
+          "2024-09-24T06:15:30.000Z,beijing,beijing,chaoyang,d09,1,",
+          "2024-09-24T06:15:35.000Z,beijing,beijing,chaoyang,d09,1,",
+          "2024-09-24T06:15:40.000Z,beijing,beijing,chaoyang,d09,1,",
+          "2024-09-24T06:15:50.000Z,beijing,beijing,chaoyang,d09,1,",
+          "2024-09-24T06:15:55.000Z,beijing,beijing,chaoyang,d09,1,",
+          "2024-09-24T06:15:35.000Z,beijing,beijing,chaoyang,d10,1,",
+          "2024-09-24T06:15:40.000Z,beijing,beijing,chaoyang,d10,1,",
+          "2024-09-24T06:15:50.000Z,beijing,beijing,chaoyang,d10,1,",
+          "2024-09-24T06:15:30.000Z,beijing,beijing,chaoyang,d11,1,",
+          "2024-09-24T06:15:35.000Z,beijing,beijing,chaoyang,d11,1,",
+          "2024-09-24T06:15:40.000Z,beijing,beijing,chaoyang,d11,1,",
+          "2024-09-24T06:15:45.000Z,beijing,beijing,chaoyang,d11,1,",
+          "2024-09-24T06:15:50.000Z,beijing,beijing,chaoyang,d11,1,",
+          "2024-09-24T06:15:30.000Z,beijing,beijing,chaoyang,d12,1,",
+          "2024-09-24T06:15:40.000Z,beijing,beijing,chaoyang,d12,1,",
+          "2024-09-24T06:15:55.000Z,beijing,beijing,chaoyang,d12,1,",
+          "2024-09-24T06:15:30.000Z,beijing,beijing,haidian,d13,1,",
+          "2024-09-24T06:15:35.000Z,beijing,beijing,haidian,d13,1,",
+          "2024-09-24T06:15:40.000Z,beijing,beijing,haidian,d13,1,",
+          "2024-09-24T06:15:50.000Z,beijing,beijing,haidian,d13,1,",
+          "2024-09-24T06:15:55.000Z,beijing,beijing,haidian,d13,1,",
+          "2024-09-24T06:15:35.000Z,beijing,beijing,haidian,d14,1,",
+          "2024-09-24T06:15:40.000Z,beijing,beijing,haidian,d14,1,",
+          "2024-09-24T06:15:50.000Z,beijing,beijing,haidian,d14,1,",
+          "2024-09-24T06:15:30.000Z,beijing,beijing,haidian,d15,1,",
+          "2024-09-24T06:15:35.000Z,beijing,beijing,haidian,d15,1,",
+          "2024-09-24T06:15:40.000Z,beijing,beijing,haidian,d15,1,",
+          "2024-09-24T06:15:45.000Z,beijing,beijing,haidian,d15,1,",
+          "2024-09-24T06:15:50.000Z,beijing,beijing,haidian,d15,1,",
+          "2024-09-24T06:15:30.000Z,beijing,beijing,haidian,d16,1,",
+          "2024-09-24T06:15:40.000Z,beijing,beijing,haidian,d16,1,",
+          "2024-09-24T06:15:55.000Z,beijing,beijing,haidian,d16,1,",
+          "2024-09-24T06:15:30.000Z,shanghai,shanghai,huangpu,d01,1,",
+          "2024-09-24T06:15:35.000Z,shanghai,shanghai,huangpu,d01,1,",
+          "2024-09-24T06:15:40.000Z,shanghai,shanghai,huangpu,d01,1,",
+          "2024-09-24T06:15:50.000Z,shanghai,shanghai,huangpu,d01,1,",
+          "2024-09-24T06:15:55.000Z,shanghai,shanghai,huangpu,d01,1,",
+          "2024-09-24T06:15:35.000Z,shanghai,shanghai,huangpu,d02,1,",
+          "2024-09-24T06:15:40.000Z,shanghai,shanghai,huangpu,d02,1,",
+          "2024-09-24T06:15:50.000Z,shanghai,shanghai,huangpu,d02,1,",
+          "2024-09-24T06:15:30.000Z,shanghai,shanghai,huangpu,d03,1,",
+          "2024-09-24T06:15:35.000Z,shanghai,shanghai,huangpu,d03,1,",
+          "2024-09-24T06:15:40.000Z,shanghai,shanghai,huangpu,d03,1,",
+          "2024-09-24T06:15:45.000Z,shanghai,shanghai,huangpu,d03,1,",
+          "2024-09-24T06:15:50.000Z,shanghai,shanghai,huangpu,d03,1,",
+          "2024-09-24T06:15:30.000Z,shanghai,shanghai,huangpu,d04,1,",
+          "2024-09-24T06:15:40.000Z,shanghai,shanghai,huangpu,d04,1,",
+          "2024-09-24T06:15:55.000Z,shanghai,shanghai,huangpu,d04,1,",
+          "2024-09-24T06:15:30.000Z,shanghai,shanghai,pudong,d05,1,",
+          "2024-09-24T06:15:35.000Z,shanghai,shanghai,pudong,d05,1,",
+          "2024-09-24T06:15:40.000Z,shanghai,shanghai,pudong,d05,1,",
+          "2024-09-24T06:15:50.000Z,shanghai,shanghai,pudong,d05,1,",
+          "2024-09-24T06:15:55.000Z,shanghai,shanghai,pudong,d05,1,",
+          "2024-09-24T06:15:35.000Z,shanghai,shanghai,pudong,d06,1,",
+          "2024-09-24T06:15:40.000Z,shanghai,shanghai,pudong,d06,1,",
+          "2024-09-24T06:15:50.000Z,shanghai,shanghai,pudong,d06,1,",
+          "2024-09-24T06:15:30.000Z,shanghai,shanghai,pudong,d07,1,",
+          "2024-09-24T06:15:35.000Z,shanghai,shanghai,pudong,d07,1,",
+          "2024-09-24T06:15:40.000Z,shanghai,shanghai,pudong,d07,1,",
+          "2024-09-24T06:15:45.000Z,shanghai,shanghai,pudong,d07,1,",
+          "2024-09-24T06:15:50.000Z,shanghai,shanghai,pudong,d07,1,",
+          "2024-09-24T06:15:30.000Z,shanghai,shanghai,pudong,d08,1,",
+          "2024-09-24T06:15:40.000Z,shanghai,shanghai,pudong,d08,1,",
+          "2024-09-24T06:15:55.000Z,shanghai,shanghai,pudong,d08,1,",
+        };
+    tableResultSetEqualTest(
+        "select date_bin(5s, time),province,city,region,device_id, 
my_count(time) from table1 group by 1,2,3,4,5 order by 2,3,4,5,1",
+        expectedHeader,
+        retArray,
+        DATABASE_NAME);
+
+    expectedHeader =
+        new String[] {
+          "_col0",
+          "province",
+          "city",
+          "region",
+          "device_id",
+          "_col5",
+          "_col6",
+          "_col7",
+          "_col8",
+          "_col9",
+          "_col10",
+          "_col11",
+          "_col12",
+          "_col13",
+          "_col14"
+        };
+    retArray =
+        new String[] {
+          
"2024-09-24T06:15:30.000Z,beijing,beijing,chaoyang,d09,1,0,1,0,0,1,0,1,1,0,",
+          
"2024-09-24T06:15:35.000Z,beijing,beijing,chaoyang,d09,0,1,1,1,0,1,1,0,1,1,",
+          
"2024-09-24T06:15:40.000Z,beijing,beijing,chaoyang,d09,1,0,1,0,1,0,1,0,1,0,",
+          
"2024-09-24T06:15:50.000Z,beijing,beijing,chaoyang,d09,0,1,0,0,1,0,0,0,1,1,",
+          
"2024-09-24T06:15:55.000Z,beijing,beijing,chaoyang,d09,1,0,0,1,0,0,0,1,1,0,",
+          
"2024-09-24T06:15:35.000Z,beijing,beijing,chaoyang,d10,1,0,0,0,1,1,1,0,1,0,",
+          
"2024-09-24T06:15:40.000Z,beijing,beijing,chaoyang,d10,1,0,0,1,0,0,1,0,1,1,",
+          
"2024-09-24T06:15:50.000Z,beijing,beijing,chaoyang,d10,0,1,0,0,0,0,1,1,1,0,",
+          
"2024-09-24T06:15:30.000Z,beijing,beijing,chaoyang,d11,0,1,0,0,0,0,0,1,1,0,",
+          
"2024-09-24T06:15:35.000Z,beijing,beijing,chaoyang,d11,1,0,0,1,0,0,1,0,1,1,",
+          
"2024-09-24T06:15:40.000Z,beijing,beijing,chaoyang,d11,1,0,1,0,1,1,0,1,1,0,",
+          
"2024-09-24T06:15:45.000Z,beijing,beijing,chaoyang,d11,0,1,0,1,0,0,1,0,1,0,",
+          
"2024-09-24T06:15:50.000Z,beijing,beijing,chaoyang,d11,0,0,1,0,0,1,0,0,1,0,",
+          
"2024-09-24T06:15:30.000Z,beijing,beijing,chaoyang,d12,0,0,1,0,1,0,1,0,1,1,",
+          
"2024-09-24T06:15:40.000Z,beijing,beijing,chaoyang,d12,0,1,0,0,0,0,0,0,1,0,",
+          
"2024-09-24T06:15:55.000Z,beijing,beijing,chaoyang,d12,1,0,0,1,0,1,0,1,1,0,",
+          
"2024-09-24T06:15:30.000Z,beijing,beijing,haidian,d13,1,0,1,0,0,1,0,1,1,0,",
+          
"2024-09-24T06:15:35.000Z,beijing,beijing,haidian,d13,0,1,1,1,0,1,1,0,1,1,",
+          
"2024-09-24T06:15:40.000Z,beijing,beijing,haidian,d13,1,0,1,0,1,0,1,0,1,0,",
+          
"2024-09-24T06:15:50.000Z,beijing,beijing,haidian,d13,0,1,0,0,1,0,0,0,1,1,",
+          
"2024-09-24T06:15:55.000Z,beijing,beijing,haidian,d13,1,0,0,1,0,0,0,1,1,0,",
+          
"2024-09-24T06:15:35.000Z,beijing,beijing,haidian,d14,1,0,0,0,1,1,1,0,1,0,",
+          
"2024-09-24T06:15:40.000Z,beijing,beijing,haidian,d14,1,0,0,1,0,0,1,0,1,1,",
+          
"2024-09-24T06:15:50.000Z,beijing,beijing,haidian,d14,0,1,0,0,0,0,1,1,1,0,",
+          
"2024-09-24T06:15:30.000Z,beijing,beijing,haidian,d15,0,1,0,0,0,0,0,1,1,0,",
+          
"2024-09-24T06:15:35.000Z,beijing,beijing,haidian,d15,1,0,0,1,0,0,1,0,1,1,",
+          
"2024-09-24T06:15:40.000Z,beijing,beijing,haidian,d15,1,0,1,0,1,1,0,1,1,0,",
+          
"2024-09-24T06:15:45.000Z,beijing,beijing,haidian,d15,0,1,0,1,0,0,1,0,1,0,",
+          
"2024-09-24T06:15:50.000Z,beijing,beijing,haidian,d15,0,0,1,0,0,1,0,0,1,0,",
+          
"2024-09-24T06:15:30.000Z,beijing,beijing,haidian,d16,0,0,1,0,1,0,1,0,1,1,",
+          
"2024-09-24T06:15:40.000Z,beijing,beijing,haidian,d16,0,1,0,0,0,0,0,0,1,0,",
+          
"2024-09-24T06:15:55.000Z,beijing,beijing,haidian,d16,1,0,0,1,0,1,0,1,1,0,",
+          
"2024-09-24T06:15:30.000Z,shanghai,shanghai,huangpu,d01,1,0,1,0,0,1,0,1,1,0,",
+          
"2024-09-24T06:15:35.000Z,shanghai,shanghai,huangpu,d01,0,1,1,1,0,1,1,0,1,1,",
+          
"2024-09-24T06:15:40.000Z,shanghai,shanghai,huangpu,d01,1,0,1,0,1,0,1,0,1,0,",
+          
"2024-09-24T06:15:50.000Z,shanghai,shanghai,huangpu,d01,0,1,0,0,1,0,0,0,1,1,",
+          
"2024-09-24T06:15:55.000Z,shanghai,shanghai,huangpu,d01,1,0,0,1,0,0,0,1,1,0,",
+          
"2024-09-24T06:15:35.000Z,shanghai,shanghai,huangpu,d02,1,0,0,0,1,1,1,0,1,0,",
+          
"2024-09-24T06:15:40.000Z,shanghai,shanghai,huangpu,d02,1,0,0,1,0,0,1,0,1,1,",
+          
"2024-09-24T06:15:50.000Z,shanghai,shanghai,huangpu,d02,0,1,0,0,0,0,1,1,1,0,",
+          
"2024-09-24T06:15:30.000Z,shanghai,shanghai,huangpu,d03,0,1,0,0,0,0,0,1,1,0,",
+          
"2024-09-24T06:15:35.000Z,shanghai,shanghai,huangpu,d03,1,0,0,1,0,0,1,0,1,1,",
+          
"2024-09-24T06:15:40.000Z,shanghai,shanghai,huangpu,d03,1,0,1,0,1,1,0,1,1,0,",
+          
"2024-09-24T06:15:45.000Z,shanghai,shanghai,huangpu,d03,0,1,0,1,0,0,1,0,1,0,",
+          
"2024-09-24T06:15:50.000Z,shanghai,shanghai,huangpu,d03,0,0,1,0,0,1,0,0,1,0,",
+          
"2024-09-24T06:15:30.000Z,shanghai,shanghai,huangpu,d04,0,0,1,0,1,0,1,0,1,1,",
+          
"2024-09-24T06:15:40.000Z,shanghai,shanghai,huangpu,d04,0,1,0,0,0,0,0,0,1,0,",
+          
"2024-09-24T06:15:55.000Z,shanghai,shanghai,huangpu,d04,1,0,0,1,0,1,0,1,1,0,",
+          
"2024-09-24T06:15:30.000Z,shanghai,shanghai,pudong,d05,1,0,1,0,0,1,0,1,1,0,",
+          
"2024-09-24T06:15:35.000Z,shanghai,shanghai,pudong,d05,0,1,1,1,0,1,1,0,1,1,",
+          
"2024-09-24T06:15:40.000Z,shanghai,shanghai,pudong,d05,1,0,1,0,1,0,1,0,1,0,",
+          
"2024-09-24T06:15:50.000Z,shanghai,shanghai,pudong,d05,0,1,0,0,1,0,0,0,1,1,",
+          
"2024-09-24T06:15:55.000Z,shanghai,shanghai,pudong,d05,1,0,0,1,0,0,0,1,1,0,",
+          
"2024-09-24T06:15:35.000Z,shanghai,shanghai,pudong,d06,1,0,0,0,1,1,1,0,1,0,",
+          
"2024-09-24T06:15:40.000Z,shanghai,shanghai,pudong,d06,1,0,0,1,0,0,1,0,1,1,",
+          
"2024-09-24T06:15:50.000Z,shanghai,shanghai,pudong,d06,0,1,0,0,0,0,1,1,1,0,",
+          
"2024-09-24T06:15:30.000Z,shanghai,shanghai,pudong,d07,0,1,0,0,0,0,0,1,1,0,",
+          
"2024-09-24T06:15:35.000Z,shanghai,shanghai,pudong,d07,1,0,0,1,0,0,1,0,1,1,",
+          
"2024-09-24T06:15:40.000Z,shanghai,shanghai,pudong,d07,1,0,1,0,1,1,0,1,1,0,",
+          
"2024-09-24T06:15:45.000Z,shanghai,shanghai,pudong,d07,0,1,0,1,0,0,1,0,1,0,",
+          
"2024-09-24T06:15:50.000Z,shanghai,shanghai,pudong,d07,0,0,1,0,0,1,0,0,1,0,",
+          
"2024-09-24T06:15:30.000Z,shanghai,shanghai,pudong,d08,0,0,1,0,1,0,1,0,1,1,",
+          
"2024-09-24T06:15:40.000Z,shanghai,shanghai,pudong,d08,0,1,0,0,0,0,0,0,1,0,",
+          
"2024-09-24T06:15:55.000Z,shanghai,shanghai,pudong,d08,1,0,0,1,0,1,0,1,1,0,",
+        };
+    tableResultSetEqualTest(
+        "select date_bin(5s, time),province,city,region,device_id, 
my_count(s1), my_count(s2), my_count(s3), my_count(s4), my_count(s5), 
my_count(s6), my_count(s7), my_count(s8), my_count(s9), my_count(s10) from 
table1 group by 1,2,3,4,5 order by 2,3,4,5,1",
+        expectedHeader,
+        retArray,
+        DATABASE_NAME);
+
+    expectedHeader =
+        new String[] {
+          "province",
+          "city",
+          "region",
+          "device_id",
+          "_col4",
+          "_col5",
+          "_col6",
+          "_col7",
+          "_col8",
+          "_col9",
+          "_col10",
+          "_col11",
+          "_col12",
+          "_col13"
+        };
+    retArray =
+        new String[] {
+          "beijing,beijing,chaoyang,d09,3,2,3,2,2,2,2,2,5,2,",
+          "beijing,beijing,chaoyang,d10,2,1,0,1,1,1,3,1,3,1,",
+          "beijing,beijing,chaoyang,d11,2,2,2,2,1,2,2,2,5,1,",
+          "beijing,beijing,chaoyang,d12,1,1,1,1,1,1,1,1,3,1,",
+          "beijing,beijing,haidian,d13,3,2,3,2,2,2,2,2,5,2,",
+          "beijing,beijing,haidian,d14,2,1,0,1,1,1,3,1,3,1,",
+          "beijing,beijing,haidian,d15,2,2,2,2,1,2,2,2,5,1,",
+          "beijing,beijing,haidian,d16,1,1,1,1,1,1,1,1,3,1,",
+          "shanghai,shanghai,huangpu,d01,3,2,3,2,2,2,2,2,5,2,",
+          "shanghai,shanghai,huangpu,d02,2,1,0,1,1,1,3,1,3,1,",
+          "shanghai,shanghai,huangpu,d03,2,2,2,2,1,2,2,2,5,1,",
+          "shanghai,shanghai,huangpu,d04,1,1,1,1,1,1,1,1,3,1,",
+          "shanghai,shanghai,pudong,d05,3,2,3,2,2,2,2,2,5,2,",
+          "shanghai,shanghai,pudong,d06,2,1,0,1,1,1,3,1,3,1,",
+          "shanghai,shanghai,pudong,d07,2,2,2,2,1,2,2,2,5,1,",
+          "shanghai,shanghai,pudong,d08,1,1,1,1,1,1,1,1,3,1,",
+        };
+    tableResultSetEqualTest(
+        "select province,city,region,device_id, my_count(s1), my_count(s2), 
my_count(s3), my_count(s4), my_count(s5), my_count(s6), my_count(s7), 
my_count(s8), my_count(s9), my_count(s10) from table1 group by 1,2,3,4 order by 
1,2,3,4",
+        expectedHeader,
+        retArray,
+        DATABASE_NAME);
+
+    expectedHeader = new String[] {"province", "city", "region", "device_id", 
"_col4"};
+    retArray =
+        new String[] {
+          "beijing,beijing,chaoyang,d09,5,",
+          "beijing,beijing,chaoyang,d10,3,",
+          "beijing,beijing,chaoyang,d11,5,",
+          "beijing,beijing,chaoyang,d12,3,",
+          "beijing,beijing,haidian,d13,5,",
+          "beijing,beijing,haidian,d14,3,",
+          "beijing,beijing,haidian,d15,5,",
+          "beijing,beijing,haidian,d16,3,",
+          "shanghai,shanghai,huangpu,d01,5,",
+          "shanghai,shanghai,huangpu,d02,3,",
+          "shanghai,shanghai,huangpu,d03,5,",
+          "shanghai,shanghai,huangpu,d04,3,",
+          "shanghai,shanghai,pudong,d05,5,",
+          "shanghai,shanghai,pudong,d06,3,",
+          "shanghai,shanghai,pudong,d07,5,",
+          "shanghai,shanghai,pudong,d08,3,",
+        };
+    tableResultSetEqualTest(
+        "select province,city,region,device_id,my_count(time) from table1 
group by 1,2,3,4 order by 1,2,3,4",
+        expectedHeader,
+        retArray,
+        DATABASE_NAME);
+
+    expectedHeader = new String[] {"province", "city", "region", "_col3"};
+    retArray =
+        new String[] {
+          "beijing,beijing,chaoyang,16,",
+          "beijing,beijing,haidian,16,",
+          "shanghai,shanghai,huangpu,16,",
+          "shanghai,shanghai,pudong,16,",
+        };
+    tableResultSetEqualTest(
+        "select province,city,region,my_count(time) from table1 group by 1,2,3 
order by 1,2,3",
+        expectedHeader,
+        retArray,
+        DATABASE_NAME);
+
+    expectedHeader = new String[] {"province", "city", "_col2"};
+    retArray =
+        new String[] {
+          "beijing,beijing,32,", "shanghai,shanghai,32,",
+        };
+    tableResultSetEqualTest(
+        "select province,city,my_count(time) from table1 group by 1,2 order by 
1,2",
+        expectedHeader,
+        retArray,
+        DATABASE_NAME);
+
+    expectedHeader = new String[] {"province", "_col1"};
+    retArray =
+        new String[] {
+          "beijing,32,", "shanghai,32,",
+        };
+    tableResultSetEqualTest(
+        "select province,my_count(time) from table1 group by 1 order by 1",
+        expectedHeader,
+        retArray,
+        DATABASE_NAME);
+
+    expectedHeader = new String[] {"_col0"};
+    retArray =
+        new String[] {
+          "64,",
+        };
+    tableResultSetEqualTest(
+        "select my_count(time) from table1", expectedHeader, retArray, 
DATABASE_NAME);
+  }
+
+  @Test
+  public void testMyAvg() {
+    SQLFunctionUtils.createUDF("my_avg", 
"org.apache.iotdb.db.query.udf.example.relational.MyAvg");
+    String[] expectedHeader = new String[] {"device_id", "color", "type", 
"_col3"};
+    String[] retArray =
+        new String[] {
+          "d01,red,A,45.0,",
+        };
+    tableResultSetEqualTest(
+        "select device_id, color, type, my_avg(s4) from table1 where time >= 
2024-09-24T06:15:30.000+00:00 and time <= 2024-09-24T06:15:59.999+00:00 and 
device_id = 'd01' group by device_id, color, type",
+        expectedHeader,
+        retArray,
+        DATABASE_NAME);
+
+    expectedHeader = new String[] {"device_id", "color", "type", "_col3"};
+    retArray =
+        new String[] {
+          "d01,red,A,55.0,",
+        };
+    tableResultSetEqualTest(
+        "select device_id, color, type, my_avg(s4) from table1 where time >= 
2024-09-24T06:15:30.000+00:00 and time <= 2024-09-24T06:15:59.999+00:00 and 
device_id = 'd01' and s1 >= 40 group by device_id, color, type",
+        expectedHeader,
+        retArray,
+        DATABASE_NAME);
+
+    expectedHeader = new String[] {"_col0", "device_id", "_col2"};
+    retArray =
+        new String[] {
+          "2024-09-24T06:15:30.000Z,d01,30.0,",
+          "2024-09-24T06:15:35.000Z,d01,35.0,",
+          "2024-09-24T06:15:40.000Z,d01,40.0,",
+          "2024-09-24T06:15:50.000Z,d01,null,",
+          "2024-09-24T06:15:55.000Z,d01,null,",
+        };
+    tableResultSetEqualTest(
+        "select date_bin(5s, time), device_id, my_avg(s3) from table1 where 
device_id = 'd01' group by 1, 2",
+        expectedHeader,
+        retArray,
+        DATABASE_NAME);
+
+    expectedHeader = new String[] {"_col0", "province", "city", "region", 
"device_id", "_col5"};
+    retArray =
+        new String[] {
+          "2024-09-24T06:15:30.000Z,beijing,beijing,chaoyang,d09,null,",
+          "2024-09-24T06:15:35.000Z,beijing,beijing,chaoyang,d09,35.0,",
+          "2024-09-24T06:15:40.000Z,beijing,beijing,chaoyang,d09,null,",
+          "2024-09-24T06:15:50.000Z,beijing,beijing,chaoyang,d09,null,",
+          "2024-09-24T06:15:55.000Z,beijing,beijing,chaoyang,d09,55.0,",
+          "2024-09-24T06:15:35.000Z,beijing,beijing,chaoyang,d10,null,",
+          "2024-09-24T06:15:40.000Z,beijing,beijing,chaoyang,d10,40.0,",
+          "2024-09-24T06:15:50.000Z,beijing,beijing,chaoyang,d10,null,",
+          "2024-09-24T06:15:30.000Z,beijing,beijing,chaoyang,d11,null,",
+          "2024-09-24T06:15:35.000Z,beijing,beijing,chaoyang,d11,36.0,",
+          "2024-09-24T06:15:40.000Z,beijing,beijing,chaoyang,d11,null,",
+          "2024-09-24T06:15:45.000Z,beijing,beijing,chaoyang,d11,46.0,",
+          "2024-09-24T06:15:50.000Z,beijing,beijing,chaoyang,d11,null,",
+          "2024-09-24T06:15:30.000Z,beijing,beijing,chaoyang,d12,null,",
+          "2024-09-24T06:15:40.000Z,beijing,beijing,chaoyang,d12,null,",
+          "2024-09-24T06:15:55.000Z,beijing,beijing,chaoyang,d12,55.0,",
+          "2024-09-24T06:15:30.000Z,beijing,beijing,haidian,d13,null,",
+          "2024-09-24T06:15:35.000Z,beijing,beijing,haidian,d13,35.0,",
+          "2024-09-24T06:15:40.000Z,beijing,beijing,haidian,d13,null,",
+          "2024-09-24T06:15:50.000Z,beijing,beijing,haidian,d13,null,",
+          "2024-09-24T06:15:55.000Z,beijing,beijing,haidian,d13,55.0,",
+          "2024-09-24T06:15:35.000Z,beijing,beijing,haidian,d14,null,",
+          "2024-09-24T06:15:40.000Z,beijing,beijing,haidian,d14,40.0,",
+          "2024-09-24T06:15:50.000Z,beijing,beijing,haidian,d14,null,",
+          "2024-09-24T06:15:30.000Z,beijing,beijing,haidian,d15,null,",
+          "2024-09-24T06:15:35.000Z,beijing,beijing,haidian,d15,36.0,",
+          "2024-09-24T06:15:40.000Z,beijing,beijing,haidian,d15,null,",
+          "2024-09-24T06:15:45.000Z,beijing,beijing,haidian,d15,46.0,",
+          "2024-09-24T06:15:50.000Z,beijing,beijing,haidian,d15,null,",
+          "2024-09-24T06:15:30.000Z,beijing,beijing,haidian,d16,null,",
+          "2024-09-24T06:15:40.000Z,beijing,beijing,haidian,d16,null,",
+          "2024-09-24T06:15:55.000Z,beijing,beijing,haidian,d16,55.0,",
+          "2024-09-24T06:15:30.000Z,shanghai,shanghai,huangpu,d01,null,",
+          "2024-09-24T06:15:35.000Z,shanghai,shanghai,huangpu,d01,35.0,",
+          "2024-09-24T06:15:40.000Z,shanghai,shanghai,huangpu,d01,null,",
+          "2024-09-24T06:15:50.000Z,shanghai,shanghai,huangpu,d01,null,",
+          "2024-09-24T06:15:55.000Z,shanghai,shanghai,huangpu,d01,55.0,",
+          "2024-09-24T06:15:35.000Z,shanghai,shanghai,huangpu,d02,null,",
+          "2024-09-24T06:15:40.000Z,shanghai,shanghai,huangpu,d02,40.0,",
+          "2024-09-24T06:15:50.000Z,shanghai,shanghai,huangpu,d02,null,",
+          "2024-09-24T06:15:30.000Z,shanghai,shanghai,huangpu,d03,null,",
+          "2024-09-24T06:15:35.000Z,shanghai,shanghai,huangpu,d03,36.0,",
+          "2024-09-24T06:15:40.000Z,shanghai,shanghai,huangpu,d03,null,",
+          "2024-09-24T06:15:45.000Z,shanghai,shanghai,huangpu,d03,46.0,",
+          "2024-09-24T06:15:50.000Z,shanghai,shanghai,huangpu,d03,null,",
+          "2024-09-24T06:15:30.000Z,shanghai,shanghai,huangpu,d04,null,",
+          "2024-09-24T06:15:40.000Z,shanghai,shanghai,huangpu,d04,null,",
+          "2024-09-24T06:15:55.000Z,shanghai,shanghai,huangpu,d04,55.0,",
+          "2024-09-24T06:15:30.000Z,shanghai,shanghai,pudong,d05,null,",
+          "2024-09-24T06:15:35.000Z,shanghai,shanghai,pudong,d05,35.0,",
+          "2024-09-24T06:15:40.000Z,shanghai,shanghai,pudong,d05,null,",
+          "2024-09-24T06:15:50.000Z,shanghai,shanghai,pudong,d05,null,",
+          "2024-09-24T06:15:55.000Z,shanghai,shanghai,pudong,d05,55.0,",
+          "2024-09-24T06:15:35.000Z,shanghai,shanghai,pudong,d06,null,",
+          "2024-09-24T06:15:40.000Z,shanghai,shanghai,pudong,d06,40.0,",
+          "2024-09-24T06:15:50.000Z,shanghai,shanghai,pudong,d06,null,",
+          "2024-09-24T06:15:30.000Z,shanghai,shanghai,pudong,d07,null,",
+          "2024-09-24T06:15:35.000Z,shanghai,shanghai,pudong,d07,36.0,",
+          "2024-09-24T06:15:40.000Z,shanghai,shanghai,pudong,d07,null,",
+          "2024-09-24T06:15:45.000Z,shanghai,shanghai,pudong,d07,46.0,",
+          "2024-09-24T06:15:50.000Z,shanghai,shanghai,pudong,d07,null,",
+          "2024-09-24T06:15:30.000Z,shanghai,shanghai,pudong,d08,null,",
+          "2024-09-24T06:15:40.000Z,shanghai,shanghai,pudong,d08,null,",
+          "2024-09-24T06:15:55.000Z,shanghai,shanghai,pudong,d08,55.0,",
+        };
+    tableResultSetEqualTest(
+        "select date_bin(5s, time),province,city,region,device_id, my_avg(s4) 
from table1 group by 1,2,3,4,5 order by 2,3,4,5,1",
+        expectedHeader,
+        retArray,
+        DATABASE_NAME);
+
+    expectedHeader = new String[] {"province", "city", "region", "device_id", 
"_col4"};
+    retArray =
+        new String[] {
+          "beijing,beijing,chaoyang,d09,42500.0,",
+          "beijing,beijing,chaoyang,d10,50000.0,",
+          "beijing,beijing,chaoyang,d11,38500.0,",
+          "beijing,beijing,chaoyang,d12,40000.0,",
+          "beijing,beijing,haidian,d13,42500.0,",
+          "beijing,beijing,haidian,d14,50000.0,",
+          "beijing,beijing,haidian,d15,38500.0,",
+          "beijing,beijing,haidian,d16,40000.0,",
+          "shanghai,shanghai,huangpu,d01,42500.0,",
+          "shanghai,shanghai,huangpu,d02,50000.0,",
+          "shanghai,shanghai,huangpu,d03,38500.0,",
+          "shanghai,shanghai,huangpu,d04,40000.0,",
+          "shanghai,shanghai,pudong,d05,42500.0,",
+          "shanghai,shanghai,pudong,d06,50000.0,",
+          "shanghai,shanghai,pudong,d07,38500.0,",
+          "shanghai,shanghai,pudong,d08,40000.0,",
+        };
+    tableResultSetEqualTest(
+        "select province,city,region,device_id, my_avg(s2) from table1 group 
by 1,2,3,4 order by 1,2,3,4",
+        expectedHeader,
+        retArray,
+        DATABASE_NAME);
+
+    expectedHeader = new String[] {"province", "city", "region", "_col3"};
+    retArray =
+        new String[] {
+          "beijing,beijing,chaoyang,44.5,",
+          "beijing,beijing,haidian,44.5,",
+          "shanghai,shanghai,huangpu,44.5,",
+          "shanghai,shanghai,pudong,44.5,",
+        };
+    tableResultSetEqualTest(
+        "select province,city,region,my_avg(s4) from table1 group by 1,2,3 
order by 1,2,3",
+        expectedHeader,
+        retArray,
+        DATABASE_NAME);
+
+    expectedHeader = new String[] {"province", "city", "_col2"};
+    retArray =
+        new String[] {
+          "beijing,beijing,44.5,", "shanghai,shanghai,44.5,",
+        };
+    tableResultSetEqualTest(
+        "select province,city,my_avg(s4) from table1 group by 1,2 order by 
1,2",
+        expectedHeader,
+        retArray,
+        DATABASE_NAME);
+
+    expectedHeader = new String[] {"province", "_col1"};
+    retArray =
+        new String[] {
+          "beijing,44.5,", "shanghai,44.5,",
+        };
+    tableResultSetEqualTest(
+        "select province,my_avg(s4) from table1 group by 1 order by 1",
+        expectedHeader,
+        retArray,
+        DATABASE_NAME);
+
+    expectedHeader = new String[] {"_col0"};
+    retArray = new String[] {"44.5,"};
+    tableResultSetEqualTest(
+        "select my_avg(s4) from table1", expectedHeader, retArray, 
DATABASE_NAME);
+  }
+
+  @Test
+  public void testIllegalInput() {
+    SQLFunctionUtils.createUDF(
+        "first_two_sum", 
"org.apache.iotdb.db.query.udf.example.relational.FirstTwoSum");
+    tableAssertTestFail(
+        "select first_two_sum(s1,s2) from table1 group by device_id",
+        "FirstTwoSum should accept three column as input",
+        DATABASE_NAME);
+    tableAssertTestFail(
+        "select first_two_sum(s1,s2,s3) from table1 group by device_id",
+        "FirstTwoSum should accept TIMESTAMP as the third input",
+        DATABASE_NAME);
+    tableAssertTestFail(
+        "select first_two_sum(s1,s10,time) from table1 group by device_id",
+        "FirstTwoSum should accept INT32, INT64, FLOAT, DOUBLE as the first 
two inputs",
+        DATABASE_NAME);
+  }
+
+  @Test
+  public void testFirstTwoSum() {
+    SQLFunctionUtils.createUDF(
+        "first_two_sum", 
"org.apache.iotdb.db.query.udf.example.relational.FirstTwoSum");
+    String[] expectedHeader = new String[] {"device_id", "sum"};
+    String[] retArray =
+        new String[] {
+          "d01,35030.0,",
+          "d02,50036.0,",
+          "d03,31036.0,",
+          "d04,40055.0,",
+          "d05,35030.0,",
+          "d06,50036.0,",
+          "d07,31036.0,",
+          "d08,40055.0,",
+          "d09,35030.0,",
+          "d10,50036.0,",
+          "d11,31036.0,",
+          "d12,40055.0,",
+          "d13,35030.0,",
+          "d14,50036.0,",
+          "d15,31036.0,",
+          "d16,40055.0,",
+        };
+
+    tableResultSetEqualTest(
+        "select device_id, first_two_sum(s1, s2, time) as sum from table1 
group by device_id",
+        expectedHeader,
+        retArray,
+        DATABASE_NAME);
+    tableResultSetEqualTest(
+        "select device_id, first_two_sum(s1, s2, s9) as sum from table1 group 
by device_id",
+        expectedHeader,
+        retArray,
+        DATABASE_NAME);
+  }
+}
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/relational/it/db/it/udf/scalar/IoTDBScalarFunctionIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/relational/it/db/it/udf/IoTDBUserDefinedScalarFunctionIT.java
similarity index 97%
rename from 
integration-test/src/test/java/org/apache/iotdb/relational/it/db/it/udf/scalar/IoTDBScalarFunctionIT.java
rename to 
integration-test/src/test/java/org/apache/iotdb/relational/it/db/it/udf/IoTDBUserDefinedScalarFunctionIT.java
index c15adcb4c5d..7bb537ccb5c 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/relational/it/db/it/udf/scalar/IoTDBScalarFunctionIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/relational/it/db/it/udf/IoTDBUserDefinedScalarFunctionIT.java
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-package org.apache.iotdb.relational.it.db.it.udf.scalar;
+package org.apache.iotdb.relational.it.db.it.udf;
 
 import org.apache.iotdb.it.env.EnvFactory;
 import org.apache.iotdb.it.framework.IoTDBTestRunner;
@@ -46,7 +46,7 @@ import static org.junit.Assert.fail;
 
 @RunWith(IoTDBTestRunner.class)
 @Category({TableLocalStandaloneIT.class, TableClusterIT.class})
-public class IoTDBScalarFunctionIT {
+public class IoTDBUserDefinedScalarFunctionIT {
   private static String[] sqls =
       new String[] {
         "CREATE DATABASE test",
@@ -66,13 +66,13 @@ public class IoTDBScalarFunctionIT {
       };
 
   @BeforeClass
-  public void setUp() throws Exception {
+  public static void setUp() throws Exception {
     EnvFactory.getEnv().initClusterEnvironment();
     insertData();
   }
 
   @AfterClass
-  public void tearDown() throws Exception {
+  public static void tearDown() throws Exception {
     EnvFactory.getEnv().cleanClusterEnvironment();
   }
 
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/relational/it/db/it/udf/SQLFunctionUtils.java
 
b/integration-test/src/test/java/org/apache/iotdb/relational/it/db/it/udf/SQLFunctionUtils.java
new file mode 100644
index 00000000000..34f57b2940b
--- /dev/null
+++ 
b/integration-test/src/test/java/org/apache/iotdb/relational/it/db/it/udf/SQLFunctionUtils.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.relational.it.db.it.udf;
+
+import org.apache.iotdb.it.env.EnvFactory;
+
+import org.junit.Assert;
+
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import static 
org.apache.iotdb.commons.conf.IoTDBConstant.FUNCTION_TYPE_USER_DEFINED_AGG_FUNC;
+import static 
org.apache.iotdb.commons.conf.IoTDBConstant.FUNCTION_TYPE_USER_DEFINED_SCALAR_FUNC;
+import static 
org.apache.iotdb.commons.conf.IoTDBConstant.FUNCTION_TYPE_USER_DEFINED_TABLE_FUNC;
+import static org.junit.Assert.fail;
+
+public class SQLFunctionUtils {
+  public static void createUDF(String udfName, String classPath) {
+    try (Connection connection = EnvFactory.getEnv().getTableConnection();
+        Statement statement = connection.createStatement()) {
+      // create
+      statement.execute(String.format("create function %s as '%s'", udfName, 
classPath));
+      // check
+      try (ResultSet resultSet = statement.executeQuery("show functions")) {
+        boolean found = false;
+        while (resultSet.next()) {
+          if (resultSet.getString(1).equals(udfName.toUpperCase())
+              && resultSet.getString(3).equals(classPath)) {
+            found = true;
+            break;
+          }
+        }
+        Assert.assertTrue("Can not find function", found);
+      }
+    } catch (Exception e) {
+      fail(e.getMessage());
+    }
+  }
+
+  public static void dropAllUDF() {
+    try (Connection connection = EnvFactory.getEnv().getTableConnection();
+        Statement statement = connection.createStatement()) {
+      List<String> udfName = new ArrayList<>();
+      try (ResultSet resultSet = statement.executeQuery("show functions")) {
+        Set<String> externalUDF =
+            new HashSet<>(
+                Arrays.asList(
+                    FUNCTION_TYPE_USER_DEFINED_SCALAR_FUNC,
+                    FUNCTION_TYPE_USER_DEFINED_AGG_FUNC,
+                    FUNCTION_TYPE_USER_DEFINED_TABLE_FUNC));
+        while (resultSet.next()) {
+          if (externalUDF.contains(resultSet.getString(2))) {
+            udfName.add(resultSet.getString(1));
+          }
+        }
+      }
+      for (String name : udfName) {
+        statement.execute(String.format("drop function %s", name));
+      }
+    } catch (Exception e) {
+      fail(e.getMessage());
+    }
+  }
+}
diff --git 
a/iotdb-api/udf-api/src/main/java/org/apache/iotdb/udf/api/customizer/config/AggregateFunctionConfig.java
 
b/iotdb-api/udf-api/src/main/java/org/apache/iotdb/udf/api/customizer/config/AggregateFunctionConfig.java
new file mode 100644
index 00000000000..8257d76609a
--- /dev/null
+++ 
b/iotdb-api/udf-api/src/main/java/org/apache/iotdb/udf/api/customizer/config/AggregateFunctionConfig.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.udf.api.customizer.config;
+
+import org.apache.iotdb.udf.api.relational.AggregateFunction;
+import org.apache.iotdb.udf.api.type.Type;
+
+public class AggregateFunctionConfig extends UDFConfigurations {
+
+  private boolean isRemovable = false;
+
+  /**
+   * Set the output data type of the scalar function.
+   *
+   * @param outputDataType the output data type of the scalar function
+   * @return this
+   */
+  public AggregateFunctionConfig setOutputDataType(Type outputDataType) {
+    this.outputDataType = outputDataType;
+    return this;
+  }
+
+  /**
+   * If aggregate function is removable, {@linkplain AggregateFunction#remove} 
should be
+   * implemented.
+   *
+   * @param removable whether the aggregate function is removable
+   */
+  public void setRemovable(boolean removable) {
+    isRemovable = removable;
+  }
+
+  public boolean isRemovable() {
+    return isRemovable;
+  }
+}
diff --git 
a/iotdb-api/udf-api/src/main/java/org/apache/iotdb/udf/api/relational/AggregateFunction.java
 
b/iotdb-api/udf-api/src/main/java/org/apache/iotdb/udf/api/relational/AggregateFunction.java
index 24942afd010..9fc263cf2e1 100644
--- 
a/iotdb-api/udf-api/src/main/java/org/apache/iotdb/udf/api/relational/AggregateFunction.java
+++ 
b/iotdb-api/udf-api/src/main/java/org/apache/iotdb/udf/api/relational/AggregateFunction.java
@@ -19,4 +19,83 @@
 
 package org.apache.iotdb.udf.api.relational;
 
-public interface AggregateFunction extends SQLFunction {}
+import org.apache.iotdb.udf.api.State;
+import org.apache.iotdb.udf.api.customizer.config.AggregateFunctionConfig;
+import org.apache.iotdb.udf.api.customizer.parameter.FunctionParameters;
+import org.apache.iotdb.udf.api.exception.UDFException;
+import org.apache.iotdb.udf.api.relational.access.Record;
+import org.apache.iotdb.udf.api.utils.ResultValue;
+
+public interface AggregateFunction extends SQLFunction {
+
+  /**
+   * This method is used to validate {@linkplain FunctionParameters}.
+   *
+   * @param parameters parameters used to validate
+   * @throws UDFException if any parameter is not valid
+   */
+  void validate(FunctionParameters parameters) throws UDFException;
+
+  /**
+   * This method is mainly used to initialize {@linkplain AggregateFunction} 
and set the output data
+   * type. In this method, the user need to do the following things:
+   *
+   * <ul>
+   *   <li>Use {@linkplain FunctionParameters} to get input data types and 
infer output data type.
+   *   <li>Use {@linkplain FunctionParameters} to get necessary attributes.
+   *   <li>Set the output data type in {@linkplain AggregateFunctionConfig}.
+   * </ul>
+   *
+   * <p>This method is called after the AggregateFunction is instantiated and 
before the beginning
+   * of the transformation process.
+   *
+   * @param parameters used to parse the input parameters entered by the user
+   * @param configurations used to set the required properties in the 
ScalarFunction
+   */
+  void beforeStart(FunctionParameters parameters, AggregateFunctionConfig 
configurations);
+
+  /** Create and initialize state. You may bind some resource in this method. 
*/
+  State createState();
+
+  /**
+   * Batch update state with data columns. You shall iterate columns and 
update state with raw
+   * values
+   *
+   * @param state state to be updated
+   * @param input original input data row
+   */
+  void addInput(State state, Record input);
+
+  /**
+   * Merge two state in execution engine.
+   *
+   * @param state current state
+   * @param rhs right-hand-side state to be merged
+   */
+  void combineState(State state, State rhs);
+
+  /**
+   * Calculate output value from final state
+   *
+   * @param state final state
+   * @param resultValue used to collect output data points
+   */
+  void outputFinal(State state, ResultValue resultValue);
+
+  /**
+   * Remove input data from state. This method is used to remove the data 
points that have been
+   * added to the state. Once it is implemented, {@linkplain 
AggregateFunctionConfig#setRemovable}
+   * should be set to true.
+   *
+   * @param state state to be updated
+   * @param input row to be removed
+   */
+  default void remove(State state, Record input) {
+    throw new UnsupportedOperationException();
+  }
+
+  /** This method is mainly used to release the resources used in the 
SQLFunction. */
+  default void beforeDestroy() {
+    // do nothing
+  }
+}
diff --git 
a/iotdb-api/udf-api/src/main/java/org/apache/iotdb/udf/api/relational/access/Record.java
 
b/iotdb-api/udf-api/src/main/java/org/apache/iotdb/udf/api/relational/access/Record.java
index 558a12cb69c..06c3f0b46cb 100644
--- 
a/iotdb-api/udf-api/src/main/java/org/apache/iotdb/udf/api/relational/access/Record.java
+++ 
b/iotdb-api/udf-api/src/main/java/org/apache/iotdb/udf/api/relational/access/Record.java
@@ -19,9 +19,10 @@
 
 package org.apache.iotdb.udf.api.relational.access;
 
-import org.apache.iotdb.udf.api.type.Binary;
 import org.apache.iotdb.udf.api.type.Type;
 
+import org.apache.tsfile.utils.Binary;
+
 import java.time.LocalDate;
 
 public interface Record {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/AccumulatorFactory.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/AccumulatorFactory.java
index 125fa756302..7eccd33ac61 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/AccumulatorFactory.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/AccumulatorFactory.java
@@ -20,6 +20,8 @@
 package 
org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation;
 
 import org.apache.iotdb.common.rpc.thrift.TAggregationType;
+import org.apache.iotdb.commons.udf.utils.TableUDFUtils;
+import org.apache.iotdb.commons.udf.utils.UDFDataTypeTransformer;
 import 
org.apache.iotdb.db.queryengine.execution.aggregation.VarianceAccumulator;
 import 
org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.grouped.GroupedAccumulator;
 import 
org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.grouped.GroupedAvgAccumulator;
@@ -35,13 +37,19 @@ import 
org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggr
 import 
org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.grouped.GroupedMinByAccumulator;
 import 
org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.grouped.GroupedModeAccumulator;
 import 
org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.grouped.GroupedSumAccumulator;
+import 
org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.grouped.GroupedUserDefinedAggregateAccumulator;
 import 
org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.grouped.GroupedVarianceAccumulator;
 import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Expression;
+import org.apache.iotdb.udf.api.customizer.config.AggregateFunctionConfig;
+import org.apache.iotdb.udf.api.customizer.parameter.FunctionParameters;
+import org.apache.iotdb.udf.api.relational.AggregateFunction;
 
 import org.apache.tsfile.enums.TSDataType;
+import org.apache.tsfile.read.common.type.TypeFactory;
 
 import java.util.List;
 import java.util.Map;
+import java.util.stream.Collectors;
 
 import static com.google.common.base.Preconditions.checkState;
 import static 
org.apache.iotdb.commons.udf.builtin.relational.TableBuiltinAggregationFunction.FIRST_BY;
@@ -60,7 +68,7 @@ public class AccumulatorFactory {
       String timeColumnName) {
     if (aggregationType == TAggregationType.UDAF) {
       // If UDAF accumulator receives raw input, it needs to check input's 
attribute
-      throw new UnsupportedOperationException();
+      return createUDAFAccumulator(functionName, inputDataTypes, 
inputAttributes);
     } else if ((LAST_BY.getFunctionName().equals(functionName)
             || FIRST_BY.getFunctionName().equals(functionName))
         && inputExpressions.size() > 1) {
@@ -99,13 +107,39 @@ public class AccumulatorFactory {
       boolean ascending) {
     if (aggregationType == TAggregationType.UDAF) {
       // If UDAF accumulator receives raw input, it needs to check input's 
attribute
-      throw new UnsupportedOperationException();
+      return createGroupedUDAFAccumulator(functionName, inputDataTypes, 
inputAttributes);
     } else {
       return createBuiltinGroupedAccumulator(
           aggregationType, inputDataTypes, inputExpressions, inputAttributes, 
ascending);
     }
   }
 
+  private static TableAccumulator createUDAFAccumulator(
+      String functionName, List<TSDataType> inputDataTypes, Map<String, 
String> inputAttributes) {
+    AggregateFunction aggregateFunction = 
TableUDFUtils.getAggregateFunction(functionName);
+    FunctionParameters functionParameters =
+        new FunctionParameters(
+            UDFDataTypeTransformer.transformToUDFDataTypeList(inputDataTypes), 
inputAttributes);
+    AggregateFunctionConfig config = new AggregateFunctionConfig();
+    aggregateFunction.beforeStart(functionParameters, config);
+    return new UserDefinedAggregateFunctionAccumulator(
+        aggregateFunction,
+        
inputDataTypes.stream().map(TypeFactory::getType).collect(Collectors.toList()));
+  }
+
+  private static GroupedAccumulator createGroupedUDAFAccumulator(
+      String functionName, List<TSDataType> inputDataTypes, Map<String, 
String> inputAttributes) {
+    AggregateFunction aggregateFunction = 
TableUDFUtils.getAggregateFunction(functionName);
+    FunctionParameters functionParameters =
+        new FunctionParameters(
+            UDFDataTypeTransformer.transformToUDFDataTypeList(inputDataTypes), 
inputAttributes);
+    AggregateFunctionConfig config = new AggregateFunctionConfig();
+    aggregateFunction.beforeStart(functionParameters, config);
+    return new GroupedUserDefinedAggregateAccumulator(
+        aggregateFunction,
+        
inputDataTypes.stream().map(TypeFactory::getType).collect(Collectors.toList()));
+  }
+
   private static GroupedAccumulator createBuiltinGroupedAccumulator(
       TAggregationType aggregationType,
       List<TSDataType> inputDataTypes,
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/UserDefinedAggregateFunctionAccumulator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/UserDefinedAggregateFunctionAccumulator.java
new file mode 100644
index 00000000000..58a5a011b77
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/UserDefinedAggregateFunctionAccumulator.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package 
org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation;
+
+import org.apache.iotdb.commons.udf.access.RecordIterator;
+import org.apache.iotdb.udf.api.State;
+import org.apache.iotdb.udf.api.relational.AggregateFunction;
+import org.apache.iotdb.udf.api.utils.ResultValue;
+
+import org.apache.tsfile.block.column.Column;
+import org.apache.tsfile.block.column.ColumnBuilder;
+import org.apache.tsfile.file.metadata.statistics.Statistics;
+import org.apache.tsfile.read.common.block.column.BinaryColumn;
+import org.apache.tsfile.read.common.block.column.BinaryColumnBuilder;
+import org.apache.tsfile.read.common.block.column.RunLengthEncodedColumn;
+import org.apache.tsfile.read.common.type.Type;
+import org.apache.tsfile.utils.Binary;
+import org.apache.tsfile.utils.RamUsageEstimator;
+
+import java.util.Arrays;
+import java.util.List;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+public class UserDefinedAggregateFunctionAccumulator implements 
TableAccumulator {
+
+  private static final long INSTANCE_SIZE =
+      
RamUsageEstimator.shallowSizeOfInstance(UserDefinedAggregateFunctionAccumulator.class);
+  private final AggregateFunction aggregateFunction;
+  private final List<Type> inputDataTypes;
+  private final State state;
+
+  public UserDefinedAggregateFunctionAccumulator(
+      AggregateFunction aggregateFunction, List<Type> inputDataTypes) {
+    this.aggregateFunction = aggregateFunction;
+    this.inputDataTypes = inputDataTypes;
+    this.state = aggregateFunction.createState();
+  }
+
+  @Override
+  public long getEstimatedSize() {
+    return INSTANCE_SIZE;
+  }
+
+  @Override
+  public TableAccumulator copy() {
+    return new UserDefinedAggregateFunctionAccumulator(aggregateFunction, 
inputDataTypes);
+  }
+
+  @Override
+  public void addInput(Column[] arguments) {
+    RecordIterator iterator =
+        new RecordIterator(
+            Arrays.asList(arguments), inputDataTypes, 
arguments[0].getPositionCount());
+    while (iterator.hasNext()) {
+      aggregateFunction.addInput(state, iterator.next());
+    }
+  }
+
+  @Override
+  public void addIntermediate(Column argument) {
+    checkArgument(
+        argument instanceof BinaryColumn
+            || (argument instanceof RunLengthEncodedColumn
+                && ((RunLengthEncodedColumn) argument).getValue() instanceof 
BinaryColumn),
+        "intermediate input and output of UDAF should be BinaryColumn");
+    State otherState = aggregateFunction.createState();
+    for (int i = 0; i < argument.getPositionCount(); i++) {
+      otherState.reset();
+      Binary otherStateBinary = argument.getBinary(i);
+      otherState.deserialize(otherStateBinary.getValues());
+      aggregateFunction.combineState(state, otherState);
+    }
+  }
+
+  @Override
+  public void evaluateIntermediate(ColumnBuilder columnBuilder) {
+    checkArgument(
+        columnBuilder instanceof BinaryColumnBuilder,
+        "intermediate input and output of UDAF should be BinaryColumn");
+    byte[] bytes = state.serialize();
+    columnBuilder.writeBinary(new Binary(bytes));
+  }
+
+  @Override
+  public void evaluateFinal(ColumnBuilder columnBuilder) {
+    ResultValue resultValue = new ResultValue(columnBuilder);
+    aggregateFunction.outputFinal(state, resultValue);
+  }
+
+  @Override
+  public boolean hasFinalResult() {
+    return false;
+  }
+
+  @Override
+  public void addStatistics(Statistics[] statistics) {
+    // UDAF not support calculate from statistics now
+    throw new UnsupportedOperationException("UDAF not support calculate from 
statistics now");
+  }
+
+  @Override
+  public void reset() {
+    state.reset();
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/grouped/GroupedUserDefinedAggregateAccumulator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/grouped/GroupedUserDefinedAggregateAccumulator.java
new file mode 100644
index 00000000000..90a41d53866
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/grouped/GroupedUserDefinedAggregateAccumulator.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package 
org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.grouped;
+
+import org.apache.iotdb.commons.udf.access.RecordIterator;
+import 
org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.grouped.array.ObjectBigArray;
+import org.apache.iotdb.udf.api.State;
+import org.apache.iotdb.udf.api.relational.AggregateFunction;
+import org.apache.iotdb.udf.api.utils.ResultValue;
+
+import org.apache.tsfile.block.column.Column;
+import org.apache.tsfile.block.column.ColumnBuilder;
+import org.apache.tsfile.read.common.block.column.BinaryColumn;
+import org.apache.tsfile.read.common.block.column.BinaryColumnBuilder;
+import org.apache.tsfile.read.common.block.column.RunLengthEncodedColumn;
+import org.apache.tsfile.read.common.type.Type;
+import org.apache.tsfile.utils.Binary;
+import org.apache.tsfile.utils.RamUsageEstimator;
+
+import java.util.Arrays;
+import java.util.List;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+public class GroupedUserDefinedAggregateAccumulator implements 
GroupedAccumulator {
+
+  private static final long INSTANCE_SIZE =
+      
RamUsageEstimator.shallowSizeOfInstance(GroupedUserDefinedAggregateAccumulator.class);
+  private final AggregateFunction aggregateFunction;
+  private final ObjectBigArray<State> stateArray;
+  private final List<Type> inputDataTypes;
+
+  public GroupedUserDefinedAggregateAccumulator(
+      AggregateFunction aggregateFunction, List<Type> inputDataTypes) {
+    this.aggregateFunction = aggregateFunction;
+    this.stateArray = new ObjectBigArray<>();
+    this.inputDataTypes = inputDataTypes;
+  }
+
+  @Override
+  public long getEstimatedSize() {
+    return INSTANCE_SIZE;
+  }
+
+  @Override
+  public void setGroupCount(long groupCount) {
+    stateArray.ensureCapacity(groupCount);
+  }
+
+  private State getOrCreateState(int groupId) {
+    State state = stateArray.get(groupId);
+    if (state == null) {
+      state = aggregateFunction.createState();
+      stateArray.set(groupId, state);
+    }
+    return state;
+  }
+
+  @Override
+  public void addInput(int[] groupIds, Column[] arguments) {
+    RecordIterator iterator =
+        new RecordIterator(
+            Arrays.asList(arguments), inputDataTypes, 
arguments[0].getPositionCount());
+    int index = 0;
+    while (iterator.hasNext()) {
+      int groupId = groupIds[index++];
+      State state = getOrCreateState(groupId);
+      aggregateFunction.addInput(state, iterator.next());
+    }
+  }
+
+  @Override
+  public void addIntermediate(int[] groupIds, Column argument) {
+    checkArgument(
+        argument instanceof BinaryColumn
+            || (argument instanceof RunLengthEncodedColumn
+                && ((RunLengthEncodedColumn) argument).getValue() instanceof 
BinaryColumn),
+        "intermediate input and output of UDAF should be BinaryColumn");
+
+    for (int i = 0; i < groupIds.length; i++) {
+      if (!argument.isNull(i)) {
+        State otherState = aggregateFunction.createState();
+        Binary otherStateBinary = argument.getBinary(i);
+        otherState.deserialize(otherStateBinary.getValues());
+        aggregateFunction.combineState(getOrCreateState(groupIds[i]), 
otherState);
+      }
+    }
+  }
+
+  @Override
+  public void evaluateIntermediate(int groupId, ColumnBuilder columnBuilder) {
+    checkArgument(
+        columnBuilder instanceof BinaryColumnBuilder,
+        "intermediate input and output of UDAF should be BinaryColumn");
+    if (stateArray.get(groupId) == null) {
+      throw new IllegalStateException(String.format("State for group %d is not 
found", groupId));
+    }
+    byte[] bytes = stateArray.get(groupId).serialize();
+    columnBuilder.writeBinary(new Binary(bytes));
+  }
+
+  @Override
+  public void evaluateFinal(int groupId, ColumnBuilder columnBuilder) {
+    ResultValue resultValue = new ResultValue(columnBuilder);
+    aggregateFunction.outputFinal(getOrCreateState(groupId), resultValue);
+  }
+
+  @Override
+  public void prepareFinal() {
+    // do nothing
+  }
+
+  @Override
+  public void reset() {
+    stateArray.reset();
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/ExpressionTreeUtils.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/ExpressionTreeUtils.java
index f786bb1ceff..f8cec46dfd1 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/ExpressionTreeUtils.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/ExpressionTreeUtils.java
@@ -20,6 +20,7 @@
 package org.apache.iotdb.db.queryengine.plan.relational.analyzer;
 
 import 
org.apache.iotdb.commons.udf.builtin.relational.TableBuiltinAggregationFunction;
+import org.apache.iotdb.commons.udf.utils.TableUDFUtils;
 import 
org.apache.iotdb.db.queryengine.plan.relational.sql.ast.DefaultExpressionTraversalVisitor;
 import 
org.apache.iotdb.db.queryengine.plan.relational.sql.ast.DereferenceExpression;
 import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Expression;
@@ -92,8 +93,8 @@ public final class ExpressionTreeUtils {
   }
 
   static boolean isAggregationFunction(String functionName) {
-    // TODO consider UDAF
     return TableBuiltinAggregationFunction.getBuiltInAggregateFunctionName()
-        .contains(functionName.toLowerCase());
+            .contains(functionName.toLowerCase())
+        || TableUDFUtils.isAggregateFunction(functionName);
   }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/TableMetadataImpl.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/TableMetadataImpl.java
index c974eff5414..4edd90bae4b 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/TableMetadataImpl.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/TableMetadataImpl.java
@@ -50,8 +50,10 @@ import 
org.apache.iotdb.db.queryengine.plan.relational.type.TypeNotFoundExceptio
 import org.apache.iotdb.db.queryengine.plan.relational.type.TypeSignature;
 import org.apache.iotdb.db.schemaengine.table.DataNodeTableCache;
 import org.apache.iotdb.db.utils.constant.SqlConstant;
+import org.apache.iotdb.udf.api.customizer.config.AggregateFunctionConfig;
 import org.apache.iotdb.udf.api.customizer.config.ScalarFunctionConfig;
 import org.apache.iotdb.udf.api.customizer.parameter.FunctionParameters;
+import org.apache.iotdb.udf.api.relational.AggregateFunction;
 import org.apache.iotdb.udf.api.relational.ScalarFunction;
 
 import org.apache.tsfile.file.metadata.IDeviceID;
@@ -640,7 +642,6 @@ public class TableMetadataImpl implements Metadata {
     }
 
     // User-defined scalar function
-
     if (TableUDFUtils.isScalarFunction(functionName)) {
       ScalarFunction scalarFunction = 
TableUDFUtils.getScalarFunction(functionName);
       FunctionParameters functionParameters =
@@ -659,10 +660,26 @@ public class TableMetadataImpl implements Metadata {
       } finally {
         scalarFunction.beforeDestroy();
       }
+    } else if (TableUDFUtils.isAggregateFunction(functionName)) {
+      AggregateFunction aggregateFunction = 
TableUDFUtils.getAggregateFunction(functionName);
+      FunctionParameters functionParameters =
+          new FunctionParameters(
+              argumentTypes.stream()
+                  .map(UDFDataTypeTransformer::transformReadTypeToUDFDataType)
+                  .collect(Collectors.toList()),
+              Collections.emptyMap());
+      try {
+        aggregateFunction.validate(functionParameters);
+        AggregateFunctionConfig config = new AggregateFunctionConfig();
+        aggregateFunction.beforeStart(functionParameters, config);
+        return 
UDFDataTypeTransformer.transformUDFDataTypeToReadType(config.getOutputDataType());
+      } catch (Exception e) {
+        throw new SemanticException("Invalid function parameters: " + 
e.getMessage());
+      } finally {
+        aggregateFunction.beforeDestroy();
+      }
     }
 
-    // TODO UDAF
-
     throw new SemanticException("Unknown function: " + functionName);
   }
 
@@ -670,7 +687,8 @@ public class TableMetadataImpl implements Metadata {
   public boolean isAggregationFunction(
       final SessionInfo session, final String functionName, final 
AccessControl accessControl) {
     return TableBuiltinAggregationFunction.getBuiltInAggregateFunctionName()
-        .contains(functionName.toLowerCase(Locale.ENGLISH));
+            .contains(functionName.toLowerCase(Locale.ENGLISH))
+        || TableUDFUtils.isAggregateFunction(functionName);
   }
 
   @Override
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/Util.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/Util.java
index 844744dbe79..8fff6f2eb23 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/Util.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/Util.java
@@ -62,8 +62,6 @@ public class Util {
               resolvedFunction.getSignature().getArgumentTypes());
       Symbol intermediateSymbol =
           symbolAllocator.newSymbol(resolvedFunction.getSignature().getName(), 
intermediateType);
-      // TODO put symbol and its type to TypeProvide or later process: add all 
map contents of
-      // SymbolAllocator to the TypeProvider
       checkState(
           !originalAggregation.getOrderingScheme().isPresent(),
           "Aggregate with ORDER BY does not support partial aggregation");
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/access/RecordIterator.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/access/RecordIterator.java
index 1155e865bb5..79d871107be 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/access/RecordIterator.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/access/RecordIterator.java
@@ -21,10 +21,11 @@ package org.apache.iotdb.commons.udf.access;
 
 import org.apache.iotdb.commons.udf.utils.UDFDataTypeTransformer;
 import org.apache.iotdb.udf.api.relational.access.Record;
-import org.apache.iotdb.udf.api.type.Binary;
 import org.apache.iotdb.udf.api.type.Type;
 
 import org.apache.tsfile.block.column.Column;
+import org.apache.tsfile.common.conf.TSFileConfig;
+import org.apache.tsfile.utils.Binary;
 import org.apache.tsfile.utils.DateUtils;
 
 import java.time.LocalDate;
@@ -87,12 +88,15 @@ public class RecordIterator implements Iterator<Record> {
 
       @Override
       public Binary getBinary(int columnIndex) {
-        return new 
Binary(childrenColumns.get(columnIndex).getBinary(index).getValues());
+        return childrenColumns.get(columnIndex).getBinary(index);
       }
 
       @Override
       public String getString(int columnIndex) {
-        return childrenColumns.get(columnIndex).getBinary(index).toString();
+        return childrenColumns
+            .get(columnIndex)
+            .getBinary(index)
+            .getStringValue(TSFileConfig.STRING_CHARSET);
       }
 
       @Override
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/relational/TableBuiltinAggregationFunction.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/relational/TableBuiltinAggregationFunction.java
index 49db27110c7..4cd046dfeb6 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/relational/TableBuiltinAggregationFunction.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/relational/TableBuiltinAggregationFunction.java
@@ -31,6 +31,7 @@ import java.util.List;
 import java.util.Set;
 import java.util.stream.Collectors;
 
+import static org.apache.tsfile.read.common.type.BlobType.BLOB;
 import static org.apache.tsfile.read.common.type.DoubleType.DOUBLE;
 import static org.apache.tsfile.read.common.type.LongType.INT64;
 
@@ -103,7 +104,8 @@ public enum TableBuiltinAggregationFunction {
       case "min":
         return originalArgumentTypes.get(0);
       default:
-        throw new IllegalArgumentException("Invalid Aggregation function: " + 
name);
+        // default is UDAF
+        return BLOB;
     }
   }
 
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/utils/TableUDFUtils.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/utils/TableUDFUtils.java
index 97548ec105b..03ba31e5a4d 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/utils/TableUDFUtils.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/utils/TableUDFUtils.java
@@ -37,19 +37,19 @@ public class TableUDFUtils {
   public static boolean isScalarFunction(String functionName) {
     UDFInformation information =
         UDFManagementService.getInstance().getUDFInformation(Model.TABLE, 
functionName);
-    return FunctionType.SCALAR.equals(information.getUdfType().getType());
+    return information != null && 
FunctionType.SCALAR.equals(information.getUdfType().getType());
   }
 
   public static boolean isAggregateFunction(String functionName) {
     UDFInformation information =
         UDFManagementService.getInstance().getUDFInformation(Model.TABLE, 
functionName);
-    return FunctionType.AGGREGATE.equals(information.getUdfType().getType());
+    return information != null && 
FunctionType.AGGREGATE.equals(information.getUdfType().getType());
   }
 
   public static boolean isTableFunction(String functionName) {
     UDFInformation information =
         UDFManagementService.getInstance().getUDFInformation(Model.TABLE, 
functionName);
-    return FunctionType.TABLE.equals(information.getUdfType().getType());
+    return information != null && 
FunctionType.TABLE.equals(information.getUdfType().getType());
   }
 
   public static ScalarFunction getScalarFunction(String functionName) throws 
UDFException {
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/utils/UDFDataTypeTransformer.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/utils/UDFDataTypeTransformer.java
index 536e39db955..83792ebf66a 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/utils/UDFDataTypeTransformer.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/utils/UDFDataTypeTransformer.java
@@ -23,11 +23,15 @@ import org.apache.iotdb.udf.api.type.Type;
 
 import org.apache.tsfile.enums.TSDataType;
 import org.apache.tsfile.read.common.type.BinaryType;
+import org.apache.tsfile.read.common.type.BlobType;
 import org.apache.tsfile.read.common.type.BooleanType;
+import org.apache.tsfile.read.common.type.DateType;
 import org.apache.tsfile.read.common.type.DoubleType;
 import org.apache.tsfile.read.common.type.FloatType;
 import org.apache.tsfile.read.common.type.IntType;
 import org.apache.tsfile.read.common.type.LongType;
+import org.apache.tsfile.read.common.type.StringType;
+import org.apache.tsfile.read.common.type.TimestampType;
 
 import java.util.List;
 import java.util.stream.Collectors;
@@ -53,37 +57,6 @@ public class UDFDataTypeTransformer {
             .collect(Collectors.toList());
   }
 
-  public static TSDataType transformReadTypeToTSDataType(
-      org.apache.tsfile.read.common.type.Type type) {
-    if (type == null) {
-      return null;
-    }
-    switch (type.getTypeEnum()) {
-      case BOOLEAN:
-        return TSDataType.BOOLEAN;
-      case INT32:
-        return TSDataType.INT32;
-      case INT64:
-        return TSDataType.INT64;
-      case FLOAT:
-        return TSDataType.FLOAT;
-      case DOUBLE:
-        return TSDataType.DOUBLE;
-      case TEXT:
-        return TSDataType.TEXT;
-      case TIMESTAMP:
-        return TSDataType.TIMESTAMP;
-      case DATE:
-        return TSDataType.DATE;
-      case BLOB:
-        return TSDataType.BLOB;
-      case STRING:
-        return TSDataType.STRING;
-      default:
-        throw new IllegalArgumentException("Invalid input: " + type);
-    }
-  }
-
   public static Type 
transformReadTypeToUDFDataType(org.apache.tsfile.read.common.type.Type type) {
     if (type == null) {
       return null;
@@ -122,19 +95,23 @@ public class UDFDataTypeTransformer {
       case BOOLEAN:
         return BooleanType.BOOLEAN;
       case INT32:
-      case DATE:
         return IntType.INT32;
+      case DATE:
+        return DateType.DATE;
       case INT64:
-      case TIMESTAMP:
         return LongType.INT64;
+      case TIMESTAMP:
+        return TimestampType.TIMESTAMP;
       case FLOAT:
         return FloatType.FLOAT;
       case DOUBLE:
         return DoubleType.DOUBLE;
       case TEXT:
+        return BinaryType.TEXT;
       case BLOB:
+        return BlobType.BLOB;
       case STRING:
-        return BinaryType.TEXT;
+        return StringType.STRING;
       default:
         throw new IllegalArgumentException("Invalid input: " + type);
     }

Reply via email to