This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch release-0.9
in repository https://gitbox.apache.org/repos/asf/fluss.git

commit 294395128b9206dd2c28ad4b2b2b98ac78a09e63
Author: Jackeyzhe <[email protected]>
AuthorDate: Wed Feb 11 11:34:54 2026 +0800

    [kv] Validate column data types for aggregation function during table 
creation (#2323)
---
 .../fluss/client/admin/FlussAdminITCase.java       | 27 +++++++
 .../org/apache/fluss/metadata/AggFunction.java     | 14 +++-
 .../org/apache/fluss/metadata/AggFunctionType.java | 89 +++++++++++++++++++++-
 .../org/apache/fluss/metadata/AggFunctions.java    |  3 +
 .../apache/fluss/metadata/TableDescriptorTest.java | 45 ++++++++++-
 .../fluss/flink/utils/FlinkAggFunctionParser.java  | 15 +++-
 .../apache/fluss/flink/utils/FlinkConversions.java |  2 +-
 .../flink/utils/FlinkAggFunctionParserTest.java    | 38 ++++++---
 .../aggregate/factory/FieldListaggAggFactory.java  |  2 +-
 .../aggregate/factory/FieldStringAggFactory.java   |  2 +-
 .../server/utils/TableDescriptorValidation.java    | 12 ++-
 .../docs/table-design/merge-engines/aggregation.md | 18 ++---
 12 files changed, 229 insertions(+), 38 deletions(-)

diff --git 
a/fluss-client/src/test/java/org/apache/fluss/client/admin/FlussAdminITCase.java
 
b/fluss-client/src/test/java/org/apache/fluss/client/admin/FlussAdminITCase.java
index 063fad65b..db1c0f01c 100644
--- 
a/fluss-client/src/test/java/org/apache/fluss/client/admin/FlussAdminITCase.java
+++ 
b/fluss-client/src/test/java/org/apache/fluss/client/admin/FlussAdminITCase.java
@@ -1711,6 +1711,33 @@ class FlussAdminITCase extends ClientToServerITCaseBase {
         admin.dropTable(tablePath, false).get();
     }
 
+    @Test
+    public void testCreateTableWithInvalidAggFunctionDataType() throws 
Exception {
+        TablePath tablePath =
+                TablePath.of(
+                        DEFAULT_TABLE_PATH.getDatabaseName(),
+                        "test_invalid_data_type_for_aggfunction");
+        Map<String, String> propertiesAggregate = new HashMap<>();
+        propertiesAggregate.put(ConfigOptions.TABLE_MERGE_ENGINE.key(), 
"aggregation");
+
+        Schema schema1 =
+                Schema.newBuilder()
+                        .column("id", DataTypes.INT())
+                        .column("sum_value", DataTypes.STRING(), 
AggFunctions.SUM())
+                        .primaryKey("id")
+                        .build();
+        TableDescriptor t1 =
+                TableDescriptor.builder()
+                        .schema(schema1)
+                        .comment("aggregate merge engine table")
+                        .properties(propertiesAggregate)
+                        .build();
+        assertThatThrownBy(() -> admin.createTable(tablePath, t1, false).get())
+                .cause()
+                .isInstanceOf(InvalidConfigException.class)
+                .hasMessageContaining("Data type for sum column must be");
+    }
+
     /**
      * Test that aggregate merge engine tables cannot use WAL changelog image 
mode.
      *
diff --git 
a/fluss-common/src/main/java/org/apache/fluss/metadata/AggFunction.java 
b/fluss-common/src/main/java/org/apache/fluss/metadata/AggFunction.java
index 996b3d3e3..1f31a8c22 100644
--- a/fluss-common/src/main/java/org/apache/fluss/metadata/AggFunction.java
+++ b/fluss-common/src/main/java/org/apache/fluss/metadata/AggFunction.java
@@ -18,6 +18,7 @@
 package org.apache.fluss.metadata;
 
 import org.apache.fluss.annotation.PublicEvolving;
+import org.apache.fluss.types.DataType;
 
 import javax.annotation.Nullable;
 
@@ -103,6 +104,17 @@ public final class AggFunction implements Serializable {
         return !parameters.isEmpty();
     }
 
+    /**
+     * Validates data type of this aggregation function. This method checks 
that the field data type
+     * is valid
+     *
+     * @param fieldType the field data type
+     * @throws IllegalArgumentException if any data type is invalid
+     */
+    public void validateDataType(DataType fieldType) {
+        type.validateDataType(fieldType);
+    }
+
     /**
      * Validates all parameters of this aggregation function.
      *
@@ -115,7 +127,7 @@ public final class AggFunction implements Serializable {
      *
      * @throws IllegalArgumentException if any parameter is invalid
      */
-    public void validate() {
+    public void validateParameters() {
         for (Map.Entry<String, String> entry : parameters.entrySet()) {
             type.validateParameter(entry.getKey(), entry.getValue());
         }
diff --git 
a/fluss-common/src/main/java/org/apache/fluss/metadata/AggFunctionType.java 
b/fluss-common/src/main/java/org/apache/fluss/metadata/AggFunctionType.java
index a8a5e56bc..f0026fbba 100644
--- a/fluss-common/src/main/java/org/apache/fluss/metadata/AggFunctionType.java
+++ b/fluss-common/src/main/java/org/apache/fluss/metadata/AggFunctionType.java
@@ -18,11 +18,17 @@
 package org.apache.fluss.metadata;
 
 import org.apache.fluss.annotation.PublicEvolving;
+import org.apache.fluss.types.DataType;
+import org.apache.fluss.types.DataTypeRoot;
 
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.Locale;
 import java.util.Set;
 
+import static org.apache.fluss.metadata.AggFunctions.PARAM_DELIMITER;
+import static org.apache.fluss.utils.Preconditions.checkArgument;
+
 /**
  * Aggregation function type for aggregate merge engine.
  *
@@ -55,8 +61,35 @@ public enum AggFunctionType {
     RBM32,
     RBM64;
 
-    /** Parameter name for delimiter used in LISTAGG and STRING_AGG functions. 
*/
-    public static final String PARAM_DELIMITER = "delimiter";
+    // 
------------------------------------------------------------------------------------------
+
+    static final DataTypeRoot[] NUMERIC_TYPES =
+            new DataTypeRoot[] {
+                DataTypeRoot.TINYINT,
+                DataTypeRoot.SMALLINT,
+                DataTypeRoot.INTEGER,
+                DataTypeRoot.BIGINT,
+                DataTypeRoot.FLOAT,
+                DataTypeRoot.DOUBLE,
+                DataTypeRoot.DECIMAL
+            };
+
+    static final DataTypeRoot[] MAX_MIN_TYPES =
+            new DataTypeRoot[] {
+                DataTypeRoot.CHAR,
+                DataTypeRoot.STRING,
+                DataTypeRoot.TINYINT,
+                DataTypeRoot.SMALLINT,
+                DataTypeRoot.INTEGER,
+                DataTypeRoot.BIGINT,
+                DataTypeRoot.FLOAT,
+                DataTypeRoot.DOUBLE,
+                DataTypeRoot.DECIMAL,
+                DataTypeRoot.DATE,
+                DataTypeRoot.TIME_WITHOUT_TIME_ZONE,
+                DataTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE,
+                DataTypeRoot.TIMESTAMP_WITH_LOCAL_TIME_ZONE
+            };
 
     /**
      * Returns the set of supported parameter names for this aggregation 
function.
@@ -75,6 +108,41 @@ public enum AggFunctionType {
         }
     }
 
+    /**
+     * Returns the supported data type roots for this aggregation function.
+     *
+     * @return an array of supported DataTypeRoot values
+     */
+    public DataTypeRoot[] getSupportedDataTypeRoots() {
+        switch (this) {
+            case BOOL_AND:
+            case BOOL_OR:
+                return new DataTypeRoot[] {DataTypeRoot.BOOLEAN};
+            case RBM32:
+            case RBM64:
+                return new DataTypeRoot[] {DataTypeRoot.BYTES};
+            case LISTAGG:
+            case STRING_AGG:
+                return new DataTypeRoot[] {DataTypeRoot.STRING, 
DataTypeRoot.CHAR};
+            case SUM:
+            case PRODUCT:
+                return NUMERIC_TYPES;
+
+            case MAX:
+            case MIN:
+                return MAX_MIN_TYPES;
+
+            case LAST_VALUE:
+            case LAST_VALUE_IGNORE_NULLS:
+            case FIRST_VALUE:
+            case FIRST_VALUE_IGNORE_NULLS:
+                // all data types are supported
+                return DataTypeRoot.values();
+            default:
+                throw new IllegalStateException("Unsupported aggregation 
function type: " + this);
+        }
+    }
+
     /**
      * Validates a parameter value for this aggregation function.
      *
@@ -100,7 +168,7 @@ public enum AggFunctionType {
         switch (this) {
             case LISTAGG:
             case STRING_AGG:
-                if (PARAM_DELIMITER.equals(parameterName)) {
+                if (AggFunctions.PARAM_DELIMITER.equals(parameterName)) {
                     if (parameterValue == null || parameterValue.isEmpty()) {
                         throw new IllegalArgumentException(
                                 String.format(
@@ -115,6 +183,21 @@ public enum AggFunctionType {
         }
     }
 
+    /**
+     * Validates a data type for this aggregation function.
+     *
+     * @param fieldType the field data type
+     * @throws IllegalArgumentException if the data type is invalid
+     */
+    public void validateDataType(DataType fieldType) {
+        checkArgument(
+                fieldType.isAnyOf(getSupportedDataTypeRoots()),
+                "Data type for %s column must be part of %s but was '%s'.",
+                toString(),
+                Arrays.deepToString(getSupportedDataTypeRoots()),
+                fieldType);
+    }
+
     /**
      * Converts a string to an AggFunctionType enum value.
      *
diff --git 
a/fluss-common/src/main/java/org/apache/fluss/metadata/AggFunctions.java 
b/fluss-common/src/main/java/org/apache/fluss/metadata/AggFunctions.java
index 3fb647158..86a704ef6 100644
--- a/fluss-common/src/main/java/org/apache/fluss/metadata/AggFunctions.java
+++ b/fluss-common/src/main/java/org/apache/fluss/metadata/AggFunctions.java
@@ -171,6 +171,9 @@ public final class AggFunctions {
      */
     public static final String DEFAULT_LISTAGG_DELIMITER = ",";
 
+    /** Parameter name for delimiter used in LISTAGG and STRING_AGG functions. 
*/
+    public static final String PARAM_DELIMITER = "delimiter";
+
     /**
      * Creates a LISTAGG aggregation function with default comma delimiter.
      *
diff --git 
a/fluss-common/src/test/java/org/apache/fluss/metadata/TableDescriptorTest.java 
b/fluss-common/src/test/java/org/apache/fluss/metadata/TableDescriptorTest.java
index 430d47d90..bd9961075 100644
--- 
a/fluss-common/src/test/java/org/apache/fluss/metadata/TableDescriptorTest.java
+++ 
b/fluss-common/src/test/java/org/apache/fluss/metadata/TableDescriptorTest.java
@@ -23,6 +23,7 @@ import org.apache.fluss.types.DataTypes;
 
 import org.junit.jupiter.api.Test;
 
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
@@ -321,7 +322,7 @@ class TableDescriptorTest {
     @Test
     void testInvalidListaggParameterEmptyDelimiter() {
         // LISTAGG with empty delimiter - should fail
-        assertThatThrownBy(() -> AggFunctions.LISTAGG("").validate())
+        assertThatThrownBy(() -> AggFunctions.LISTAGG("").validateParameters())
                 .isInstanceOf(IllegalArgumentException.class)
                 .hasMessageContaining("must be a non-empty string");
     }
@@ -332,7 +333,8 @@ class TableDescriptorTest {
         Map<String, String> params = new HashMap<>();
         params.put("unknown_param", "value");
 
-        assertThatThrownBy(() -> AggFunctions.of(AggFunctionType.LISTAGG, 
params).validate())
+        assertThatThrownBy(
+                        () -> AggFunctions.of(AggFunctionType.LISTAGG, 
params).validateParameters())
                 .isInstanceOf(IllegalArgumentException.class)
                 .hasMessageContaining("unknown_param")
                 .hasMessageContaining("not supported");
@@ -344,9 +346,46 @@ class TableDescriptorTest {
         Map<String, String> params = new HashMap<>();
         params.put("some_param", "value");
 
-        assertThatThrownBy(() -> AggFunctions.of(AggFunctionType.SUM, 
params).validate())
+        assertThatThrownBy(() -> AggFunctions.of(AggFunctionType.SUM, 
params).validateParameters())
                 .isInstanceOf(IllegalArgumentException.class)
                 .hasMessageContaining("some_param")
                 .hasMessageContaining("not supported");
     }
+
+    @Test
+    void testValidateAggFunctionWithDataType() {
+        Map<String, String> params = new HashMap<>();
+
+        // invalid case
+        assertThatThrownBy(
+                        () ->
+                                AggFunctions.of(AggFunctionType.BOOL_AND, 
params)
+                                        .validateDataType(DataTypes.STRING()))
+                .isInstanceOf(IllegalArgumentException.class)
+                .hasMessageContaining("column must be part of")
+                .hasMessageContaining(
+                        
Arrays.deepToString(AggFunctionType.BOOL_AND.getSupportedDataTypeRoots()));
+
+        assertThatThrownBy(
+                        () ->
+                                AggFunctions.of(AggFunctionType.SUM, params)
+                                        .validateDataType(DataTypes.STRING()))
+                .isInstanceOf(IllegalArgumentException.class)
+                .hasMessageContaining("column must be part of")
+                .hasMessageContaining(
+                        
Arrays.deepToString(AggFunctionType.SUM.getSupportedDataTypeRoots()));
+
+        assertThatThrownBy(
+                        () ->
+                                AggFunctions.of(AggFunctionType.MAX, params)
+                                        .validateDataType(DataTypes.BOOLEAN()))
+                .isInstanceOf(IllegalArgumentException.class)
+                .hasMessageContaining("column must be part of")
+                .hasMessageContaining(
+                        
Arrays.deepToString(AggFunctionType.MAX.getSupportedDataTypeRoots()));
+
+        // valid case
+        AggFunctions.of(AggFunctionType.LAST_VALUE, 
params).validateDataType(DataTypes.STRING());
+        AggFunctions.of(AggFunctionType.LISTAGG, 
params).validateDataType(DataTypes.STRING());
+    }
 }
diff --git 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/FlinkAggFunctionParser.java
 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/FlinkAggFunctionParser.java
index a781a75d5..ad1785a98 100644
--- 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/FlinkAggFunctionParser.java
+++ 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/FlinkAggFunctionParser.java
@@ -21,6 +21,7 @@ import org.apache.fluss.annotation.VisibleForTesting;
 import org.apache.fluss.metadata.AggFunction;
 import org.apache.fluss.metadata.AggFunctionType;
 import org.apache.fluss.metadata.AggFunctions;
+import org.apache.fluss.types.DataType;
 
 import org.apache.flink.configuration.Configuration;
 
@@ -59,7 +60,8 @@ public class FlinkAggFunctionParser {
      *
      * <p>Returns empty if no aggregation function is configured for this 
column.
      */
-    public static Optional<AggFunction> parseAggFunction(String columnName, 
Configuration options) {
+    public static Optional<AggFunction> parseAggFunction(
+            String columnName, DataType dataType, Configuration options) {
 
         // Check column-level configuration: fields.<column>.agg
         String columnFuncKey = AGG_PREFIX + columnName + AGG_SUFFIX;
@@ -75,7 +77,7 @@ public class FlinkAggFunctionParser {
         Map<String, String> params =
                 collectParameters(AGG_PREFIX + columnName + "." + funcName + 
".", options);
 
-        return Optional.of(createAggFunction(type, params, columnName));
+        return Optional.of(createAggFunction(type, params, columnName, 
dataType));
     }
 
     /**
@@ -182,10 +184,14 @@ public class FlinkAggFunctionParser {
      * @param type the aggregation function type
      * @param params the function parameters (may be empty)
      * @param columnName the column name (used for error messages)
+     * @param dataType the column data type (use for validate)
      * @return the created aggregation function
      */
     private static AggFunction createAggFunction(
-            AggFunctionType type, Map<String, String> params, String 
columnName) {
+            AggFunctionType type,
+            Map<String, String> params,
+            String columnName,
+            DataType dataType) {
         // Use generic factory method to create aggregation function
         // This delegates parameter handling and validation to the underlying 
implementation
         AggFunction aggFunction =
@@ -193,7 +199,8 @@ public class FlinkAggFunctionParser {
 
         // Validate all parameters to ensure they are supported and valid
         try {
-            aggFunction.validate();
+            aggFunction.validateParameters();
+            aggFunction.validateDataType(dataType);
         } catch (IllegalArgumentException e) {
             throw new IllegalArgumentException(
                     String.format(
diff --git 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/FlinkConversions.java
 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/FlinkConversions.java
index f2d34dc14..2c908e128 100644
--- 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/FlinkConversions.java
+++ 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/FlinkConversions.java
@@ -691,7 +691,7 @@ public class FlinkConversions {
         // Parse and add aggregation function if needed
         if (parseAggFunction) {
             Optional<AggFunction> aggFunction =
-                    FlinkAggFunctionParser.parseAggFunction(columnName, 
tableConf);
+                    FlinkAggFunctionParser.parseAggFunction(columnName, 
flussDataType, tableConf);
             if (aggFunction.isPresent()) {
                 schemaBuilder.column(columnName, flussDataType, 
aggFunction.get());
             } else {
diff --git 
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/utils/FlinkAggFunctionParserTest.java
 
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/utils/FlinkAggFunctionParserTest.java
index 62e861da7..fe019a08a 100644
--- 
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/utils/FlinkAggFunctionParserTest.java
+++ 
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/utils/FlinkAggFunctionParserTest.java
@@ -20,6 +20,7 @@ package org.apache.fluss.flink.utils;
 import org.apache.fluss.metadata.AggFunction;
 import org.apache.fluss.metadata.AggFunctionType;
 import org.apache.fluss.metadata.AggFunctions;
+import org.apache.fluss.types.DataTypes;
 
 import org.apache.flink.configuration.Configuration;
 import org.junit.jupiter.api.Test;
@@ -37,7 +38,8 @@ class FlinkAggFunctionParserTest {
     @Test
     void testParseNoAggFunction() {
         Configuration options = new Configuration();
-        assertThat(FlinkAggFunctionParser.parseAggFunction("total", 
options)).isEmpty();
+        assertThat(FlinkAggFunctionParser.parseAggFunction("total", 
DataTypes.INT(), options))
+                .isEmpty();
     }
 
     @Test
@@ -45,7 +47,8 @@ class FlinkAggFunctionParserTest {
         Configuration options = new Configuration();
         options.setString("fields.total.agg", "sum");
 
-        Optional<AggFunction> result = 
FlinkAggFunctionParser.parseAggFunction("total", options);
+        Optional<AggFunction> result =
+                FlinkAggFunctionParser.parseAggFunction("total", 
DataTypes.INT(), options);
 
         assertThat(result).isPresent();
         assertThat(result.get().getType()).isEqualTo(AggFunctionType.SUM);
@@ -58,7 +61,8 @@ class FlinkAggFunctionParserTest {
         options.setString("fields.tags.agg", "listagg");
         options.setString("fields.tags.listagg.delimiter", ";");
 
-        Optional<AggFunction> result = 
FlinkAggFunctionParser.parseAggFunction("tags", options);
+        Optional<AggFunction> result =
+                FlinkAggFunctionParser.parseAggFunction("tags", 
DataTypes.STRING(), options);
 
         assertThat(result).isPresent();
         assertThat(result.get().getType()).isEqualTo(AggFunctionType.LISTAGG);
@@ -76,7 +80,8 @@ class FlinkAggFunctionParserTest {
         options.setString("fields.tags.agg", "LISTAGG");
         options.setString("fields.tags.listagg.delimiter", ";");
 
-        Optional<AggFunction> result = 
FlinkAggFunctionParser.parseAggFunction("tags", options);
+        Optional<AggFunction> result =
+                FlinkAggFunctionParser.parseAggFunction("tags", 
DataTypes.STRING(), options);
 
         assertThat(result).isPresent();
         assertThat(result.get().getType()).isEqualTo(AggFunctionType.LISTAGG);
@@ -91,9 +96,12 @@ class FlinkAggFunctionParserTest {
         options.setString("fields.col2.agg", "listagg");
         options.setString("fields.col2.listagg.delimiter", "|"); // This 
should not affect col1
 
-        Optional<AggFunction> col1Func = 
FlinkAggFunctionParser.parseAggFunction("col1", options);
-        Optional<AggFunction> col2Func = 
FlinkAggFunctionParser.parseAggFunction("col2", options);
-        Optional<AggFunction> col3Func = 
FlinkAggFunctionParser.parseAggFunction("col3", options);
+        Optional<AggFunction> col1Func =
+                FlinkAggFunctionParser.parseAggFunction("col1", 
DataTypes.INT(), options);
+        Optional<AggFunction> col2Func =
+                FlinkAggFunctionParser.parseAggFunction("col2", 
DataTypes.STRING(), options);
+        Optional<AggFunction> col3Func =
+                FlinkAggFunctionParser.parseAggFunction("col3", 
DataTypes.STRING(), options);
 
         // col1 should have SUM without parameters
         assertThat(col1Func).isPresent();
@@ -114,7 +122,10 @@ class FlinkAggFunctionParserTest {
         Configuration options = new Configuration();
         options.setString("fields.total.agg", "invalid_function");
 
-        assertThatThrownBy(() -> 
FlinkAggFunctionParser.parseAggFunction("total", options))
+        assertThatThrownBy(
+                        () ->
+                                FlinkAggFunctionParser.parseAggFunction(
+                                        "total", DataTypes.STRING(), options))
                 .isInstanceOf(IllegalArgumentException.class)
                 .hasMessageContaining("Unknown aggregation function")
                 .hasMessageContaining("invalid_function");
@@ -125,7 +136,10 @@ class FlinkAggFunctionParserTest {
         Configuration options = new Configuration();
         options.setString("fields.total.agg", "");
 
-        assertThatThrownBy(() -> 
FlinkAggFunctionParser.parseAggFunction("total", options))
+        assertThatThrownBy(
+                        () ->
+                                FlinkAggFunctionParser.parseAggFunction(
+                                        "total", DataTypes.STRING(), options))
                 .isInstanceOf(IllegalArgumentException.class)
                 .hasMessageContaining("Empty aggregation function name");
     }
@@ -142,8 +156,10 @@ class FlinkAggFunctionParserTest {
 
         // Parse them back
         Configuration config = Configuration.fromMap(options);
-        Optional<AggFunction> col1Func = 
FlinkAggFunctionParser.parseAggFunction("col1", config);
-        Optional<AggFunction> col2Func = 
FlinkAggFunctionParser.parseAggFunction("col2", config);
+        Optional<AggFunction> col1Func =
+                FlinkAggFunctionParser.parseAggFunction("col1", 
DataTypes.INT(), config);
+        Optional<AggFunction> col2Func =
+                FlinkAggFunctionParser.parseAggFunction("col2", 
DataTypes.STRING(), config);
 
         // Verify they match the original functions
         assertThat(col1Func).isPresent();
diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/kv/rowmerger/aggregate/factory/FieldListaggAggFactory.java
 
b/fluss-server/src/main/java/org/apache/fluss/server/kv/rowmerger/aggregate/factory/FieldListaggAggFactory.java
index aeb4b01d6..5c1a42b3f 100644
--- 
a/fluss-server/src/main/java/org/apache/fluss/server/kv/rowmerger/aggregate/factory/FieldListaggAggFactory.java
+++ 
b/fluss-server/src/main/java/org/apache/fluss/server/kv/rowmerger/aggregate/factory/FieldListaggAggFactory.java
@@ -42,7 +42,7 @@ public class FieldListaggAggFactory implements 
FieldAggregatorFactory {
                 fieldType);
 
         // Get delimiter from function parameters, default to comma
-        String delimiter = 
aggFunction.getParameter(AggFunctionType.PARAM_DELIMITER);
+        String delimiter = 
aggFunction.getParameter(AggFunctions.PARAM_DELIMITER);
         if (delimiter == null) {
             delimiter = AggFunctions.DEFAULT_LISTAGG_DELIMITER;
         }
diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/kv/rowmerger/aggregate/factory/FieldStringAggFactory.java
 
b/fluss-server/src/main/java/org/apache/fluss/server/kv/rowmerger/aggregate/factory/FieldStringAggFactory.java
index 784776f36..791bbba75 100644
--- 
a/fluss-server/src/main/java/org/apache/fluss/server/kv/rowmerger/aggregate/factory/FieldStringAggFactory.java
+++ 
b/fluss-server/src/main/java/org/apache/fluss/server/kv/rowmerger/aggregate/factory/FieldStringAggFactory.java
@@ -48,7 +48,7 @@ public class FieldStringAggFactory implements 
FieldAggregatorFactory {
                 fieldType);
 
         // Get delimiter from function parameters, default to comma
-        String delimiter = 
aggFunction.getParameter(AggFunctionType.PARAM_DELIMITER);
+        String delimiter = 
aggFunction.getParameter(AggFunctions.PARAM_DELIMITER);
         if (delimiter == null) {
             delimiter = AggFunctions.DEFAULT_LISTAGG_DELIMITER;
         }
diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/utils/TableDescriptorValidation.java
 
b/fluss-server/src/main/java/org/apache/fluss/server/utils/TableDescriptorValidation.java
index 760ad50ae..0c8e9ef35 100644
--- 
a/fluss-server/src/main/java/org/apache/fluss/server/utils/TableDescriptorValidation.java
+++ 
b/fluss-server/src/main/java/org/apache/fluss/server/utils/TableDescriptorValidation.java
@@ -322,11 +322,13 @@ public class TableDescriptorValidation {
     /**
      * Validates aggregation function parameters in the schema.
      *
-     * <p>This method delegates to {@link AggFunction#validate()} to ensure 
all parameters are valid
+     * <p>This method delegates to {@link AggFunction#validateParameters()} to 
ensure all parameters
+     * are valid and {@link AggFunction#validateDataType(DataType)} to ensure 
data type are valid
      * according to the function's requirements.
      *
      * @param schema the schema to validate
-     * @throws InvalidConfigException if any aggregation function has invalid 
parameters
+     * @throws InvalidConfigException if any aggregation function has invalid 
parameters or data
+     *     types
      */
     private static void validateAggregationFunctionParameters(Schema schema) {
         // Get primary key columns for early exit
@@ -343,9 +345,11 @@ public class TableDescriptorValidation {
                 continue;
             }
 
-            // Validate aggregation function parameters
+            // Validate aggregation function parameters and data type
             try {
-                aggFunctionOpt.get().validate();
+                AggFunction aggFunction = aggFunctionOpt.get();
+                aggFunction.validateParameters();
+                aggFunction.validateDataType(column.getDataType());
             } catch (IllegalArgumentException e) {
                 throw new InvalidConfigException(
                         String.format(
diff --git a/website/docs/table-design/merge-engines/aggregation.md 
b/website/docs/table-design/merge-engines/aggregation.md
index 4523e830a..7d585f210 100644
--- a/website/docs/table-design/merge-engines/aggregation.md
+++ b/website/docs/table-design/merge-engines/aggregation.md
@@ -265,7 +265,7 @@ TableDescriptor.builder()
 
 Computes the product of values across multiple rows.
 
-- **Supported Data Types**: TINYINT, SMALLINT, INT, BIGINT, FLOAT, DOUBLE, 
DECIMAL
+- **Supported Data Types**: `TINYINT`, `SMALLINT`, `INT`, `BIGINT`, `FLOAT`, 
`DOUBLE`, `DECIMAL`
 - **Behavior**: Multiplies incoming values with the accumulator
 - **Null Handling**: Null values are ignored
 
@@ -321,7 +321,7 @@ TableDescriptor.builder()
 
 Identifies and retains the maximum value.
 
-- **Supported Data Types**: CHAR, STRING, TINYINT, SMALLINT, INT, BIGINT, 
FLOAT, DOUBLE, DECIMAL, DATE, TIME, TIMESTAMP, TIMESTAMP_LTZ
+- **Supported Data Types**: `CHAR`, `STRING`, `TINYINT`, `SMALLINT`, `INT`, 
`BIGINT`, `FLOAT`, `DOUBLE`, `DECIMAL`, `DATE`, `TIME`, `TIMESTAMP`, 
`TIMESTAMP_LTZ`
 - **Behavior**: Keeps the larger value between accumulator and incoming value
 - **Null Handling**: Null values are ignored
 
@@ -379,7 +379,7 @@ TableDescriptor.builder()
 
 Identifies and retains the minimum value.
 
-- **Supported Data Types**: CHAR, STRING, TINYINT, SMALLINT, INT, BIGINT, 
FLOAT, DOUBLE, DECIMAL, DATE, TIME, TIMESTAMP, TIMESTAMP_LTZ
+- **Supported Data Types**: `CHAR`, `STRING`, `TINYINT`, `SMALLINT`, `INT`, 
`BIGINT`, `FLOAT`, `DOUBLE`, `DECIMAL`, `DATE`, `TIME`, `TIMESTAMP`, 
`TIMESTAMP_LTZ`
 - **Behavior**: Keeps the smaller value between accumulator and incoming value
 - **Null Handling**: Null values are ignored
 
@@ -702,7 +702,7 @@ TableDescriptor.builder()
 
 Concatenates multiple string values into a single string with a delimiter.
 
-- **Supported Data Types**: STRING, CHAR
+- **Supported Data Types**: `STRING`, `CHAR`
 - **Behavior**: Concatenates values using the specified delimiter
 - **Null Handling**: Null values are skipped
 - **Delimiter**: Specify delimiter directly in the aggregation function 
(default is comma `,`)
@@ -764,7 +764,7 @@ TableDescriptor.builder()
 
 Alias for `listagg`. Concatenates multiple string values into a single string 
with a delimiter.
 
-- **Supported Data Types**: STRING, CHAR
+- **Supported Data Types**: `STRING`, `CHAR`
 - **Behavior**: Same as `listagg` - concatenates values using the specified 
delimiter
 - **Null Handling**: Null values are skipped
 - **Delimiter**: Specify delimiter directly in the aggregation function 
(default is comma `,`)
@@ -831,7 +831,7 @@ TableDescriptor.builder()
 
 Aggregates serialized 32-bit RoaringBitmap values by union.
 
-- **Supported Data Types**: BYTES
+- **Supported Data Types**: `BYTES`
 - **Behavior**: ORs incoming bitmaps with the accumulator
 - **Null Handling**: Null values are ignored
 
@@ -868,7 +868,7 @@ Schema schema = Schema.newBuilder()
 
 Aggregates serialized 64-bit RoaringBitmap values by union.
 
-- **Supported Data Types**: BYTES
+- **Supported Data Types**: `BYTES`
 - **Behavior**: ORs incoming bitmaps with the accumulator
 - **Null Handling**: Null values are ignored
 
@@ -905,7 +905,7 @@ Schema schema = Schema.newBuilder()
 
 Evaluates whether all boolean values in a set are true (logical AND).
 
-- **Supported Data Types**: BOOLEAN
+- **Supported Data Types**: `BOOLEAN`
 - **Behavior**: Returns true only if all values are true
 - **Null Handling**: Null values are ignored
 
@@ -962,7 +962,7 @@ TableDescriptor.builder()
 
 Checks if at least one boolean value in a set is true (logical OR).
 
-- **Supported Data Types**: BOOLEAN
+- **Supported Data Types**: `BOOLEAN`
 - **Behavior**: Returns true if any value is true
 - **Null Handling**: Null values are ignored
 

Reply via email to