This is an automated email from the ASF dual-hosted git repository.
jark pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git
The following commit(s) were added to refs/heads/main by this push:
new d54ac70cb [kv] Validate column data types for aggregation function
during table creation (#2323)
d54ac70cb is described below
commit d54ac70cbcce7b5467153e377fb4dfdb82244959
Author: Jackeyzhe <[email protected]>
AuthorDate: Wed Feb 11 11:34:54 2026 +0800
[kv] Validate column data types for aggregation function during table
creation (#2323)
---
.../fluss/client/admin/FlussAdminITCase.java | 27 +++++++
.../org/apache/fluss/metadata/AggFunction.java | 14 +++-
.../org/apache/fluss/metadata/AggFunctionType.java | 89 +++++++++++++++++++++-
.../org/apache/fluss/metadata/AggFunctions.java | 3 +
.../apache/fluss/metadata/TableDescriptorTest.java | 45 ++++++++++-
.../fluss/flink/utils/FlinkAggFunctionParser.java | 15 +++-
.../apache/fluss/flink/utils/FlinkConversions.java | 2 +-
.../flink/utils/FlinkAggFunctionParserTest.java | 38 ++++++---
.../aggregate/factory/FieldListaggAggFactory.java | 2 +-
.../aggregate/factory/FieldStringAggFactory.java | 2 +-
.../server/utils/TableDescriptorValidation.java | 12 ++-
.../docs/table-design/merge-engines/aggregation.md | 18 ++---
12 files changed, 229 insertions(+), 38 deletions(-)
diff --git
a/fluss-client/src/test/java/org/apache/fluss/client/admin/FlussAdminITCase.java
b/fluss-client/src/test/java/org/apache/fluss/client/admin/FlussAdminITCase.java
index 063fad65b..db1c0f01c 100644
---
a/fluss-client/src/test/java/org/apache/fluss/client/admin/FlussAdminITCase.java
+++
b/fluss-client/src/test/java/org/apache/fluss/client/admin/FlussAdminITCase.java
@@ -1711,6 +1711,33 @@ class FlussAdminITCase extends ClientToServerITCaseBase {
admin.dropTable(tablePath, false).get();
}
+ @Test
+ public void testCreateTableWithInvalidAggFunctionDataType() throws
Exception {
+ TablePath tablePath =
+ TablePath.of(
+ DEFAULT_TABLE_PATH.getDatabaseName(),
+ "test_invalid_data_type_for_aggfunction");
+ Map<String, String> propertiesAggregate = new HashMap<>();
+ propertiesAggregate.put(ConfigOptions.TABLE_MERGE_ENGINE.key(),
"aggregation");
+
+ Schema schema1 =
+ Schema.newBuilder()
+ .column("id", DataTypes.INT())
+ .column("sum_value", DataTypes.STRING(),
AggFunctions.SUM())
+ .primaryKey("id")
+ .build();
+ TableDescriptor t1 =
+ TableDescriptor.builder()
+ .schema(schema1)
+ .comment("aggregate merge engine table")
+ .properties(propertiesAggregate)
+ .build();
+ assertThatThrownBy(() -> admin.createTable(tablePath, t1, false).get())
+ .cause()
+ .isInstanceOf(InvalidConfigException.class)
+ .hasMessageContaining("Data type for sum column must be");
+ }
+
/**
* Test that aggregate merge engine tables cannot use WAL changelog image
mode.
*
diff --git
a/fluss-common/src/main/java/org/apache/fluss/metadata/AggFunction.java
b/fluss-common/src/main/java/org/apache/fluss/metadata/AggFunction.java
index 996b3d3e3..1f31a8c22 100644
--- a/fluss-common/src/main/java/org/apache/fluss/metadata/AggFunction.java
+++ b/fluss-common/src/main/java/org/apache/fluss/metadata/AggFunction.java
@@ -18,6 +18,7 @@
package org.apache.fluss.metadata;
import org.apache.fluss.annotation.PublicEvolving;
+import org.apache.fluss.types.DataType;
import javax.annotation.Nullable;
@@ -103,6 +104,17 @@ public final class AggFunction implements Serializable {
return !parameters.isEmpty();
}
+ /**
+ * Validates data type of this aggregation function. This method checks
that the field data type
+ * is valid
+ *
+ * @param fieldType the field data type
+ * @throws IllegalArgumentException if any data type is invalid
+ */
+ public void validateDataType(DataType fieldType) {
+ type.validateDataType(fieldType);
+ }
+
/**
* Validates all parameters of this aggregation function.
*
@@ -115,7 +127,7 @@ public final class AggFunction implements Serializable {
*
* @throws IllegalArgumentException if any parameter is invalid
*/
- public void validate() {
+ public void validateParameters() {
for (Map.Entry<String, String> entry : parameters.entrySet()) {
type.validateParameter(entry.getKey(), entry.getValue());
}
diff --git
a/fluss-common/src/main/java/org/apache/fluss/metadata/AggFunctionType.java
b/fluss-common/src/main/java/org/apache/fluss/metadata/AggFunctionType.java
index a8a5e56bc..f0026fbba 100644
--- a/fluss-common/src/main/java/org/apache/fluss/metadata/AggFunctionType.java
+++ b/fluss-common/src/main/java/org/apache/fluss/metadata/AggFunctionType.java
@@ -18,11 +18,17 @@
package org.apache.fluss.metadata;
import org.apache.fluss.annotation.PublicEvolving;
+import org.apache.fluss.types.DataType;
+import org.apache.fluss.types.DataTypeRoot;
+import java.util.Arrays;
import java.util.Collections;
import java.util.Locale;
import java.util.Set;
+import static org.apache.fluss.metadata.AggFunctions.PARAM_DELIMITER;
+import static org.apache.fluss.utils.Preconditions.checkArgument;
+
/**
* Aggregation function type for aggregate merge engine.
*
@@ -55,8 +61,35 @@ public enum AggFunctionType {
RBM32,
RBM64;
- /** Parameter name for delimiter used in LISTAGG and STRING_AGG functions.
*/
- public static final String PARAM_DELIMITER = "delimiter";
+ //
------------------------------------------------------------------------------------------
+
+ static final DataTypeRoot[] NUMERIC_TYPES =
+ new DataTypeRoot[] {
+ DataTypeRoot.TINYINT,
+ DataTypeRoot.SMALLINT,
+ DataTypeRoot.INTEGER,
+ DataTypeRoot.BIGINT,
+ DataTypeRoot.FLOAT,
+ DataTypeRoot.DOUBLE,
+ DataTypeRoot.DECIMAL
+ };
+
+ static final DataTypeRoot[] MAX_MIN_TYPES =
+ new DataTypeRoot[] {
+ DataTypeRoot.CHAR,
+ DataTypeRoot.STRING,
+ DataTypeRoot.TINYINT,
+ DataTypeRoot.SMALLINT,
+ DataTypeRoot.INTEGER,
+ DataTypeRoot.BIGINT,
+ DataTypeRoot.FLOAT,
+ DataTypeRoot.DOUBLE,
+ DataTypeRoot.DECIMAL,
+ DataTypeRoot.DATE,
+ DataTypeRoot.TIME_WITHOUT_TIME_ZONE,
+ DataTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE,
+ DataTypeRoot.TIMESTAMP_WITH_LOCAL_TIME_ZONE
+ };
/**
* Returns the set of supported parameter names for this aggregation
function.
@@ -75,6 +108,41 @@ public enum AggFunctionType {
}
}
+ /**
+ * Returns the supported data type roots for this aggregation function.
+ *
+ * @return an array of supported DataTypeRoot values
+ */
+ public DataTypeRoot[] getSupportedDataTypeRoots() {
+ switch (this) {
+ case BOOL_AND:
+ case BOOL_OR:
+ return new DataTypeRoot[] {DataTypeRoot.BOOLEAN};
+ case RBM32:
+ case RBM64:
+ return new DataTypeRoot[] {DataTypeRoot.BYTES};
+ case LISTAGG:
+ case STRING_AGG:
+ return new DataTypeRoot[] {DataTypeRoot.STRING,
DataTypeRoot.CHAR};
+ case SUM:
+ case PRODUCT:
+ return NUMERIC_TYPES;
+
+ case MAX:
+ case MIN:
+ return MAX_MIN_TYPES;
+
+ case LAST_VALUE:
+ case LAST_VALUE_IGNORE_NULLS:
+ case FIRST_VALUE:
+ case FIRST_VALUE_IGNORE_NULLS:
+ // all data types are supported
+ return DataTypeRoot.values();
+ default:
+ throw new IllegalStateException("Unsupported aggregation
function type: " + this);
+ }
+ }
+
/**
* Validates a parameter value for this aggregation function.
*
@@ -100,7 +168,7 @@ public enum AggFunctionType {
switch (this) {
case LISTAGG:
case STRING_AGG:
- if (PARAM_DELIMITER.equals(parameterName)) {
+ if (AggFunctions.PARAM_DELIMITER.equals(parameterName)) {
if (parameterValue == null || parameterValue.isEmpty()) {
throw new IllegalArgumentException(
String.format(
@@ -115,6 +183,21 @@ public enum AggFunctionType {
}
}
+ /**
+ * Validates a data type for this aggregation function.
+ *
+ * @param fieldType the field data type
+ * @throws IllegalArgumentException if the data type is invalid
+ */
+ public void validateDataType(DataType fieldType) {
+ checkArgument(
+ fieldType.isAnyOf(getSupportedDataTypeRoots()),
+ "Data type for %s column must be part of %s but was '%s'.",
+ toString(),
+ Arrays.deepToString(getSupportedDataTypeRoots()),
+ fieldType);
+ }
+
/**
* Converts a string to an AggFunctionType enum value.
*
diff --git
a/fluss-common/src/main/java/org/apache/fluss/metadata/AggFunctions.java
b/fluss-common/src/main/java/org/apache/fluss/metadata/AggFunctions.java
index 3fb647158..86a704ef6 100644
--- a/fluss-common/src/main/java/org/apache/fluss/metadata/AggFunctions.java
+++ b/fluss-common/src/main/java/org/apache/fluss/metadata/AggFunctions.java
@@ -171,6 +171,9 @@ public final class AggFunctions {
*/
public static final String DEFAULT_LISTAGG_DELIMITER = ",";
+ /** Parameter name for delimiter used in LISTAGG and STRING_AGG functions.
*/
+ public static final String PARAM_DELIMITER = "delimiter";
+
/**
* Creates a LISTAGG aggregation function with default comma delimiter.
*
diff --git
a/fluss-common/src/test/java/org/apache/fluss/metadata/TableDescriptorTest.java
b/fluss-common/src/test/java/org/apache/fluss/metadata/TableDescriptorTest.java
index 430d47d90..bd9961075 100644
---
a/fluss-common/src/test/java/org/apache/fluss/metadata/TableDescriptorTest.java
+++
b/fluss-common/src/test/java/org/apache/fluss/metadata/TableDescriptorTest.java
@@ -23,6 +23,7 @@ import org.apache.fluss.types.DataTypes;
import org.junit.jupiter.api.Test;
+import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
@@ -321,7 +322,7 @@ class TableDescriptorTest {
@Test
void testInvalidListaggParameterEmptyDelimiter() {
// LISTAGG with empty delimiter - should fail
- assertThatThrownBy(() -> AggFunctions.LISTAGG("").validate())
+ assertThatThrownBy(() -> AggFunctions.LISTAGG("").validateParameters())
.isInstanceOf(IllegalArgumentException.class)
.hasMessageContaining("must be a non-empty string");
}
@@ -332,7 +333,8 @@ class TableDescriptorTest {
Map<String, String> params = new HashMap<>();
params.put("unknown_param", "value");
- assertThatThrownBy(() -> AggFunctions.of(AggFunctionType.LISTAGG,
params).validate())
+ assertThatThrownBy(
+ () -> AggFunctions.of(AggFunctionType.LISTAGG,
params).validateParameters())
.isInstanceOf(IllegalArgumentException.class)
.hasMessageContaining("unknown_param")
.hasMessageContaining("not supported");
@@ -344,9 +346,46 @@ class TableDescriptorTest {
Map<String, String> params = new HashMap<>();
params.put("some_param", "value");
- assertThatThrownBy(() -> AggFunctions.of(AggFunctionType.SUM,
params).validate())
+ assertThatThrownBy(() -> AggFunctions.of(AggFunctionType.SUM,
params).validateParameters())
.isInstanceOf(IllegalArgumentException.class)
.hasMessageContaining("some_param")
.hasMessageContaining("not supported");
}
+
+ @Test
+ void testValidateAggFunctionWithDataType() {
+ Map<String, String> params = new HashMap<>();
+
+ // invalid case
+ assertThatThrownBy(
+ () ->
+ AggFunctions.of(AggFunctionType.BOOL_AND,
params)
+ .validateDataType(DataTypes.STRING()))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessageContaining("column must be part of")
+ .hasMessageContaining(
+
Arrays.deepToString(AggFunctionType.BOOL_AND.getSupportedDataTypeRoots()));
+
+ assertThatThrownBy(
+ () ->
+ AggFunctions.of(AggFunctionType.SUM, params)
+ .validateDataType(DataTypes.STRING()))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessageContaining("column must be part of")
+ .hasMessageContaining(
+
Arrays.deepToString(AggFunctionType.SUM.getSupportedDataTypeRoots()));
+
+ assertThatThrownBy(
+ () ->
+ AggFunctions.of(AggFunctionType.MAX, params)
+ .validateDataType(DataTypes.BOOLEAN()))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessageContaining("column must be part of")
+ .hasMessageContaining(
+
Arrays.deepToString(AggFunctionType.MAX.getSupportedDataTypeRoots()));
+
+ // valid case
+ AggFunctions.of(AggFunctionType.LAST_VALUE,
params).validateDataType(DataTypes.STRING());
+ AggFunctions.of(AggFunctionType.LISTAGG,
params).validateDataType(DataTypes.STRING());
+ }
}
diff --git
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/FlinkAggFunctionParser.java
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/FlinkAggFunctionParser.java
index a781a75d5..ad1785a98 100644
---
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/FlinkAggFunctionParser.java
+++
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/FlinkAggFunctionParser.java
@@ -21,6 +21,7 @@ import org.apache.fluss.annotation.VisibleForTesting;
import org.apache.fluss.metadata.AggFunction;
import org.apache.fluss.metadata.AggFunctionType;
import org.apache.fluss.metadata.AggFunctions;
+import org.apache.fluss.types.DataType;
import org.apache.flink.configuration.Configuration;
@@ -59,7 +60,8 @@ public class FlinkAggFunctionParser {
*
* <p>Returns empty if no aggregation function is configured for this
column.
*/
- public static Optional<AggFunction> parseAggFunction(String columnName,
Configuration options) {
+ public static Optional<AggFunction> parseAggFunction(
+ String columnName, DataType dataType, Configuration options) {
// Check column-level configuration: fields.<column>.agg
String columnFuncKey = AGG_PREFIX + columnName + AGG_SUFFIX;
@@ -75,7 +77,7 @@ public class FlinkAggFunctionParser {
Map<String, String> params =
collectParameters(AGG_PREFIX + columnName + "." + funcName +
".", options);
- return Optional.of(createAggFunction(type, params, columnName));
+ return Optional.of(createAggFunction(type, params, columnName,
dataType));
}
/**
@@ -182,10 +184,14 @@ public class FlinkAggFunctionParser {
* @param type the aggregation function type
* @param params the function parameters (may be empty)
* @param columnName the column name (used for error messages)
+ * @param dataType the column data type (use for validate)
* @return the created aggregation function
*/
private static AggFunction createAggFunction(
- AggFunctionType type, Map<String, String> params, String
columnName) {
+ AggFunctionType type,
+ Map<String, String> params,
+ String columnName,
+ DataType dataType) {
// Use generic factory method to create aggregation function
// This delegates parameter handling and validation to the underlying
implementation
AggFunction aggFunction =
@@ -193,7 +199,8 @@ public class FlinkAggFunctionParser {
// Validate all parameters to ensure they are supported and valid
try {
- aggFunction.validate();
+ aggFunction.validateParameters();
+ aggFunction.validateDataType(dataType);
} catch (IllegalArgumentException e) {
throw new IllegalArgumentException(
String.format(
diff --git
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/FlinkConversions.java
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/FlinkConversions.java
index f2d34dc14..2c908e128 100644
---
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/FlinkConversions.java
+++
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/FlinkConversions.java
@@ -691,7 +691,7 @@ public class FlinkConversions {
// Parse and add aggregation function if needed
if (parseAggFunction) {
Optional<AggFunction> aggFunction =
- FlinkAggFunctionParser.parseAggFunction(columnName,
tableConf);
+ FlinkAggFunctionParser.parseAggFunction(columnName,
flussDataType, tableConf);
if (aggFunction.isPresent()) {
schemaBuilder.column(columnName, flussDataType,
aggFunction.get());
} else {
diff --git
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/utils/FlinkAggFunctionParserTest.java
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/utils/FlinkAggFunctionParserTest.java
index 62e861da7..fe019a08a 100644
---
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/utils/FlinkAggFunctionParserTest.java
+++
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/utils/FlinkAggFunctionParserTest.java
@@ -20,6 +20,7 @@ package org.apache.fluss.flink.utils;
import org.apache.fluss.metadata.AggFunction;
import org.apache.fluss.metadata.AggFunctionType;
import org.apache.fluss.metadata.AggFunctions;
+import org.apache.fluss.types.DataTypes;
import org.apache.flink.configuration.Configuration;
import org.junit.jupiter.api.Test;
@@ -37,7 +38,8 @@ class FlinkAggFunctionParserTest {
@Test
void testParseNoAggFunction() {
Configuration options = new Configuration();
- assertThat(FlinkAggFunctionParser.parseAggFunction("total",
options)).isEmpty();
+ assertThat(FlinkAggFunctionParser.parseAggFunction("total",
DataTypes.INT(), options))
+ .isEmpty();
}
@Test
@@ -45,7 +47,8 @@ class FlinkAggFunctionParserTest {
Configuration options = new Configuration();
options.setString("fields.total.agg", "sum");
- Optional<AggFunction> result =
FlinkAggFunctionParser.parseAggFunction("total", options);
+ Optional<AggFunction> result =
+ FlinkAggFunctionParser.parseAggFunction("total",
DataTypes.INT(), options);
assertThat(result).isPresent();
assertThat(result.get().getType()).isEqualTo(AggFunctionType.SUM);
@@ -58,7 +61,8 @@ class FlinkAggFunctionParserTest {
options.setString("fields.tags.agg", "listagg");
options.setString("fields.tags.listagg.delimiter", ";");
- Optional<AggFunction> result =
FlinkAggFunctionParser.parseAggFunction("tags", options);
+ Optional<AggFunction> result =
+ FlinkAggFunctionParser.parseAggFunction("tags",
DataTypes.STRING(), options);
assertThat(result).isPresent();
assertThat(result.get().getType()).isEqualTo(AggFunctionType.LISTAGG);
@@ -76,7 +80,8 @@ class FlinkAggFunctionParserTest {
options.setString("fields.tags.agg", "LISTAGG");
options.setString("fields.tags.listagg.delimiter", ";");
- Optional<AggFunction> result =
FlinkAggFunctionParser.parseAggFunction("tags", options);
+ Optional<AggFunction> result =
+ FlinkAggFunctionParser.parseAggFunction("tags",
DataTypes.STRING(), options);
assertThat(result).isPresent();
assertThat(result.get().getType()).isEqualTo(AggFunctionType.LISTAGG);
@@ -91,9 +96,12 @@ class FlinkAggFunctionParserTest {
options.setString("fields.col2.agg", "listagg");
options.setString("fields.col2.listagg.delimiter", "|"); // This
should not affect col1
- Optional<AggFunction> col1Func =
FlinkAggFunctionParser.parseAggFunction("col1", options);
- Optional<AggFunction> col2Func =
FlinkAggFunctionParser.parseAggFunction("col2", options);
- Optional<AggFunction> col3Func =
FlinkAggFunctionParser.parseAggFunction("col3", options);
+ Optional<AggFunction> col1Func =
+ FlinkAggFunctionParser.parseAggFunction("col1",
DataTypes.INT(), options);
+ Optional<AggFunction> col2Func =
+ FlinkAggFunctionParser.parseAggFunction("col2",
DataTypes.STRING(), options);
+ Optional<AggFunction> col3Func =
+ FlinkAggFunctionParser.parseAggFunction("col3",
DataTypes.STRING(), options);
// col1 should have SUM without parameters
assertThat(col1Func).isPresent();
@@ -114,7 +122,10 @@ class FlinkAggFunctionParserTest {
Configuration options = new Configuration();
options.setString("fields.total.agg", "invalid_function");
- assertThatThrownBy(() ->
FlinkAggFunctionParser.parseAggFunction("total", options))
+ assertThatThrownBy(
+ () ->
+ FlinkAggFunctionParser.parseAggFunction(
+ "total", DataTypes.STRING(), options))
.isInstanceOf(IllegalArgumentException.class)
.hasMessageContaining("Unknown aggregation function")
.hasMessageContaining("invalid_function");
@@ -125,7 +136,10 @@ class FlinkAggFunctionParserTest {
Configuration options = new Configuration();
options.setString("fields.total.agg", "");
- assertThatThrownBy(() ->
FlinkAggFunctionParser.parseAggFunction("total", options))
+ assertThatThrownBy(
+ () ->
+ FlinkAggFunctionParser.parseAggFunction(
+ "total", DataTypes.STRING(), options))
.isInstanceOf(IllegalArgumentException.class)
.hasMessageContaining("Empty aggregation function name");
}
@@ -142,8 +156,10 @@ class FlinkAggFunctionParserTest {
// Parse them back
Configuration config = Configuration.fromMap(options);
- Optional<AggFunction> col1Func =
FlinkAggFunctionParser.parseAggFunction("col1", config);
- Optional<AggFunction> col2Func =
FlinkAggFunctionParser.parseAggFunction("col2", config);
+ Optional<AggFunction> col1Func =
+ FlinkAggFunctionParser.parseAggFunction("col1",
DataTypes.INT(), config);
+ Optional<AggFunction> col2Func =
+ FlinkAggFunctionParser.parseAggFunction("col2",
DataTypes.STRING(), config);
// Verify they match the original functions
assertThat(col1Func).isPresent();
diff --git
a/fluss-server/src/main/java/org/apache/fluss/server/kv/rowmerger/aggregate/factory/FieldListaggAggFactory.java
b/fluss-server/src/main/java/org/apache/fluss/server/kv/rowmerger/aggregate/factory/FieldListaggAggFactory.java
index aeb4b01d6..5c1a42b3f 100644
---
a/fluss-server/src/main/java/org/apache/fluss/server/kv/rowmerger/aggregate/factory/FieldListaggAggFactory.java
+++
b/fluss-server/src/main/java/org/apache/fluss/server/kv/rowmerger/aggregate/factory/FieldListaggAggFactory.java
@@ -42,7 +42,7 @@ public class FieldListaggAggFactory implements
FieldAggregatorFactory {
fieldType);
// Get delimiter from function parameters, default to comma
- String delimiter =
aggFunction.getParameter(AggFunctionType.PARAM_DELIMITER);
+ String delimiter =
aggFunction.getParameter(AggFunctions.PARAM_DELIMITER);
if (delimiter == null) {
delimiter = AggFunctions.DEFAULT_LISTAGG_DELIMITER;
}
diff --git
a/fluss-server/src/main/java/org/apache/fluss/server/kv/rowmerger/aggregate/factory/FieldStringAggFactory.java
b/fluss-server/src/main/java/org/apache/fluss/server/kv/rowmerger/aggregate/factory/FieldStringAggFactory.java
index 784776f36..791bbba75 100644
---
a/fluss-server/src/main/java/org/apache/fluss/server/kv/rowmerger/aggregate/factory/FieldStringAggFactory.java
+++
b/fluss-server/src/main/java/org/apache/fluss/server/kv/rowmerger/aggregate/factory/FieldStringAggFactory.java
@@ -48,7 +48,7 @@ public class FieldStringAggFactory implements
FieldAggregatorFactory {
fieldType);
// Get delimiter from function parameters, default to comma
- String delimiter =
aggFunction.getParameter(AggFunctionType.PARAM_DELIMITER);
+ String delimiter =
aggFunction.getParameter(AggFunctions.PARAM_DELIMITER);
if (delimiter == null) {
delimiter = AggFunctions.DEFAULT_LISTAGG_DELIMITER;
}
diff --git
a/fluss-server/src/main/java/org/apache/fluss/server/utils/TableDescriptorValidation.java
b/fluss-server/src/main/java/org/apache/fluss/server/utils/TableDescriptorValidation.java
index 760ad50ae..0c8e9ef35 100644
---
a/fluss-server/src/main/java/org/apache/fluss/server/utils/TableDescriptorValidation.java
+++
b/fluss-server/src/main/java/org/apache/fluss/server/utils/TableDescriptorValidation.java
@@ -322,11 +322,13 @@ public class TableDescriptorValidation {
/**
* Validates aggregation function parameters in the schema.
*
- * <p>This method delegates to {@link AggFunction#validate()} to ensure
all parameters are valid
+ * <p>This method delegates to {@link AggFunction#validateParameters()} to
ensure all parameters
+ * are valid and {@link AggFunction#validateDataType(DataType)} to ensure
data type are valid
* according to the function's requirements.
*
* @param schema the schema to validate
- * @throws InvalidConfigException if any aggregation function has invalid
parameters
+ * @throws InvalidConfigException if any aggregation function has invalid
parameters or data
+ * types
*/
private static void validateAggregationFunctionParameters(Schema schema) {
// Get primary key columns for early exit
@@ -343,9 +345,11 @@ public class TableDescriptorValidation {
continue;
}
- // Validate aggregation function parameters
+ // Validate aggregation function parameters and data type
try {
- aggFunctionOpt.get().validate();
+ AggFunction aggFunction = aggFunctionOpt.get();
+ aggFunction.validateParameters();
+ aggFunction.validateDataType(column.getDataType());
} catch (IllegalArgumentException e) {
throw new InvalidConfigException(
String.format(
diff --git a/website/docs/table-design/merge-engines/aggregation.md
b/website/docs/table-design/merge-engines/aggregation.md
index 4523e830a..7d585f210 100644
--- a/website/docs/table-design/merge-engines/aggregation.md
+++ b/website/docs/table-design/merge-engines/aggregation.md
@@ -265,7 +265,7 @@ TableDescriptor.builder()
Computes the product of values across multiple rows.
-- **Supported Data Types**: TINYINT, SMALLINT, INT, BIGINT, FLOAT, DOUBLE,
DECIMAL
+- **Supported Data Types**: `TINYINT`, `SMALLINT`, `INT`, `BIGINT`, `FLOAT`,
`DOUBLE`, `DECIMAL`
- **Behavior**: Multiplies incoming values with the accumulator
- **Null Handling**: Null values are ignored
@@ -321,7 +321,7 @@ TableDescriptor.builder()
Identifies and retains the maximum value.
-- **Supported Data Types**: CHAR, STRING, TINYINT, SMALLINT, INT, BIGINT,
FLOAT, DOUBLE, DECIMAL, DATE, TIME, TIMESTAMP, TIMESTAMP_LTZ
+- **Supported Data Types**: `CHAR`, `STRING`, `TINYINT`, `SMALLINT`, `INT`,
`BIGINT`, `FLOAT`, `DOUBLE`, `DECIMAL`, `DATE`, `TIME`, `TIMESTAMP`,
`TIMESTAMP_LTZ`
- **Behavior**: Keeps the larger value between accumulator and incoming value
- **Null Handling**: Null values are ignored
@@ -379,7 +379,7 @@ TableDescriptor.builder()
Identifies and retains the minimum value.
-- **Supported Data Types**: CHAR, STRING, TINYINT, SMALLINT, INT, BIGINT,
FLOAT, DOUBLE, DECIMAL, DATE, TIME, TIMESTAMP, TIMESTAMP_LTZ
+- **Supported Data Types**: `CHAR`, `STRING`, `TINYINT`, `SMALLINT`, `INT`,
`BIGINT`, `FLOAT`, `DOUBLE`, `DECIMAL`, `DATE`, `TIME`, `TIMESTAMP`,
`TIMESTAMP_LTZ`
- **Behavior**: Keeps the smaller value between accumulator and incoming value
- **Null Handling**: Null values are ignored
@@ -702,7 +702,7 @@ TableDescriptor.builder()
Concatenates multiple string values into a single string with a delimiter.
-- **Supported Data Types**: STRING, CHAR
+- **Supported Data Types**: `STRING`, `CHAR`
- **Behavior**: Concatenates values using the specified delimiter
- **Null Handling**: Null values are skipped
- **Delimiter**: Specify delimiter directly in the aggregation function
(default is comma `,`)
@@ -764,7 +764,7 @@ TableDescriptor.builder()
Alias for `listagg`. Concatenates multiple string values into a single string
with a delimiter.
-- **Supported Data Types**: STRING, CHAR
+- **Supported Data Types**: `STRING`, `CHAR`
- **Behavior**: Same as `listagg` - concatenates values using the specified
delimiter
- **Null Handling**: Null values are skipped
- **Delimiter**: Specify delimiter directly in the aggregation function
(default is comma `,`)
@@ -831,7 +831,7 @@ TableDescriptor.builder()
Aggregates serialized 32-bit RoaringBitmap values by union.
-- **Supported Data Types**: BYTES
+- **Supported Data Types**: `BYTES`
- **Behavior**: ORs incoming bitmaps with the accumulator
- **Null Handling**: Null values are ignored
@@ -868,7 +868,7 @@ Schema schema = Schema.newBuilder()
Aggregates serialized 64-bit RoaringBitmap values by union.
-- **Supported Data Types**: BYTES
+- **Supported Data Types**: `BYTES`
- **Behavior**: ORs incoming bitmaps with the accumulator
- **Null Handling**: Null values are ignored
@@ -905,7 +905,7 @@ Schema schema = Schema.newBuilder()
Evaluates whether all boolean values in a set are true (logical AND).
-- **Supported Data Types**: BOOLEAN
+- **Supported Data Types**: `BOOLEAN`
- **Behavior**: Returns true only if all values are true
- **Null Handling**: Null values are ignored
@@ -962,7 +962,7 @@ TableDescriptor.builder()
Checks if at least one boolean value in a set is true (logical OR).
-- **Supported Data Types**: BOOLEAN
+- **Supported Data Types**: `BOOLEAN`
- **Behavior**: Returns true if any value is true
- **Null Handling**: Null values are ignored