This is an automated email from the ASF dual-hosted git repository. jark pushed a commit to branch release-0.9 in repository https://gitbox.apache.org/repos/asf/fluss.git
commit 294395128b9206dd2c28ad4b2b2b98ac78a09e63 Author: Jackeyzhe <[email protected]> AuthorDate: Wed Feb 11 11:34:54 2026 +0800 [kv] Validate column data types for aggregation function during table creation (#2323) --- .../fluss/client/admin/FlussAdminITCase.java | 27 +++++++ .../org/apache/fluss/metadata/AggFunction.java | 14 +++- .../org/apache/fluss/metadata/AggFunctionType.java | 89 +++++++++++++++++++++- .../org/apache/fluss/metadata/AggFunctions.java | 3 + .../apache/fluss/metadata/TableDescriptorTest.java | 45 ++++++++++- .../fluss/flink/utils/FlinkAggFunctionParser.java | 15 +++- .../apache/fluss/flink/utils/FlinkConversions.java | 2 +- .../flink/utils/FlinkAggFunctionParserTest.java | 38 ++++++--- .../aggregate/factory/FieldListaggAggFactory.java | 2 +- .../aggregate/factory/FieldStringAggFactory.java | 2 +- .../server/utils/TableDescriptorValidation.java | 12 ++- .../docs/table-design/merge-engines/aggregation.md | 18 ++--- 12 files changed, 229 insertions(+), 38 deletions(-) diff --git a/fluss-client/src/test/java/org/apache/fluss/client/admin/FlussAdminITCase.java b/fluss-client/src/test/java/org/apache/fluss/client/admin/FlussAdminITCase.java index 063fad65b..db1c0f01c 100644 --- a/fluss-client/src/test/java/org/apache/fluss/client/admin/FlussAdminITCase.java +++ b/fluss-client/src/test/java/org/apache/fluss/client/admin/FlussAdminITCase.java @@ -1711,6 +1711,33 @@ class FlussAdminITCase extends ClientToServerITCaseBase { admin.dropTable(tablePath, false).get(); } + @Test + public void testCreateTableWithInvalidAggFunctionDataType() throws Exception { + TablePath tablePath = + TablePath.of( + DEFAULT_TABLE_PATH.getDatabaseName(), + "test_invalid_data_type_for_aggfunction"); + Map<String, String> propertiesAggregate = new HashMap<>(); + propertiesAggregate.put(ConfigOptions.TABLE_MERGE_ENGINE.key(), "aggregation"); + + Schema schema1 = + Schema.newBuilder() + .column("id", DataTypes.INT()) + .column("sum_value", DataTypes.STRING(), AggFunctions.SUM()) + .primaryKey("id") + .build(); + TableDescriptor t1 = + TableDescriptor.builder() + .schema(schema1) + .comment("aggregate merge engine table") + .properties(propertiesAggregate) + .build(); + assertThatThrownBy(() -> admin.createTable(tablePath, t1, false).get()) + .cause() + .isInstanceOf(InvalidConfigException.class) + .hasMessageContaining("Data type for sum column must be"); + } + /** * Test that aggregate merge engine tables cannot use WAL changelog image mode. * diff --git a/fluss-common/src/main/java/org/apache/fluss/metadata/AggFunction.java b/fluss-common/src/main/java/org/apache/fluss/metadata/AggFunction.java index 996b3d3e3..1f31a8c22 100644 --- a/fluss-common/src/main/java/org/apache/fluss/metadata/AggFunction.java +++ b/fluss-common/src/main/java/org/apache/fluss/metadata/AggFunction.java @@ -18,6 +18,7 @@ package org.apache.fluss.metadata; import org.apache.fluss.annotation.PublicEvolving; +import org.apache.fluss.types.DataType; import javax.annotation.Nullable; @@ -103,6 +104,17 @@ public final class AggFunction implements Serializable { return !parameters.isEmpty(); } + /** + * Validates data type of this aggregation function. This method checks that the field data type + * is valid + * + * @param fieldType the field data type + * @throws IllegalArgumentException if any data type is invalid + */ + public void validateDataType(DataType fieldType) { + type.validateDataType(fieldType); + } + /** * Validates all parameters of this aggregation function. * @@ -115,7 +127,7 @@ public final class AggFunction implements Serializable { * * @throws IllegalArgumentException if any parameter is invalid */ - public void validate() { + public void validateParameters() { for (Map.Entry<String, String> entry : parameters.entrySet()) { type.validateParameter(entry.getKey(), entry.getValue()); } diff --git a/fluss-common/src/main/java/org/apache/fluss/metadata/AggFunctionType.java b/fluss-common/src/main/java/org/apache/fluss/metadata/AggFunctionType.java index a8a5e56bc..f0026fbba 100644 --- a/fluss-common/src/main/java/org/apache/fluss/metadata/AggFunctionType.java +++ b/fluss-common/src/main/java/org/apache/fluss/metadata/AggFunctionType.java @@ -18,11 +18,17 @@ package org.apache.fluss.metadata; import org.apache.fluss.annotation.PublicEvolving; +import org.apache.fluss.types.DataType; +import org.apache.fluss.types.DataTypeRoot; +import java.util.Arrays; import java.util.Collections; import java.util.Locale; import java.util.Set; +import static org.apache.fluss.metadata.AggFunctions.PARAM_DELIMITER; +import static org.apache.fluss.utils.Preconditions.checkArgument; + /** * Aggregation function type for aggregate merge engine. * @@ -55,8 +61,35 @@ public enum AggFunctionType { RBM32, RBM64; - /** Parameter name for delimiter used in LISTAGG and STRING_AGG functions. */ - public static final String PARAM_DELIMITER = "delimiter"; + // ------------------------------------------------------------------------------------------ + + static final DataTypeRoot[] NUMERIC_TYPES = + new DataTypeRoot[] { + DataTypeRoot.TINYINT, + DataTypeRoot.SMALLINT, + DataTypeRoot.INTEGER, + DataTypeRoot.BIGINT, + DataTypeRoot.FLOAT, + DataTypeRoot.DOUBLE, + DataTypeRoot.DECIMAL + }; + + static final DataTypeRoot[] MAX_MIN_TYPES = + new DataTypeRoot[] { + DataTypeRoot.CHAR, + DataTypeRoot.STRING, + DataTypeRoot.TINYINT, + DataTypeRoot.SMALLINT, + DataTypeRoot.INTEGER, + DataTypeRoot.BIGINT, + DataTypeRoot.FLOAT, + DataTypeRoot.DOUBLE, + DataTypeRoot.DECIMAL, + DataTypeRoot.DATE, + DataTypeRoot.TIME_WITHOUT_TIME_ZONE, + DataTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE, + DataTypeRoot.TIMESTAMP_WITH_LOCAL_TIME_ZONE + }; /** * Returns the set of supported parameter names for this aggregation function. @@ -75,6 +108,41 @@ public enum AggFunctionType { } } + /** + * Returns the supported data type roots for this aggregation function. + * + * @return an array of supported DataTypeRoot values + */ + public DataTypeRoot[] getSupportedDataTypeRoots() { + switch (this) { + case BOOL_AND: + case BOOL_OR: + return new DataTypeRoot[] {DataTypeRoot.BOOLEAN}; + case RBM32: + case RBM64: + return new DataTypeRoot[] {DataTypeRoot.BYTES}; + case LISTAGG: + case STRING_AGG: + return new DataTypeRoot[] {DataTypeRoot.STRING, DataTypeRoot.CHAR}; + case SUM: + case PRODUCT: + return NUMERIC_TYPES; + + case MAX: + case MIN: + return MAX_MIN_TYPES; + + case LAST_VALUE: + case LAST_VALUE_IGNORE_NULLS: + case FIRST_VALUE: + case FIRST_VALUE_IGNORE_NULLS: + // all data types are supported + return DataTypeRoot.values(); + default: + throw new IllegalStateException("Unsupported aggregation function type: " + this); + } + } + /** * Validates a parameter value for this aggregation function. * @@ -100,7 +168,7 @@ public enum AggFunctionType { switch (this) { case LISTAGG: case STRING_AGG: - if (PARAM_DELIMITER.equals(parameterName)) { + if (AggFunctions.PARAM_DELIMITER.equals(parameterName)) { if (parameterValue == null || parameterValue.isEmpty()) { throw new IllegalArgumentException( String.format( @@ -115,6 +183,21 @@ public enum AggFunctionType { } } + /** + * Validates a data type for this aggregation function. + * + * @param fieldType the field data type + * @throws IllegalArgumentException if the data type is invalid + */ + public void validateDataType(DataType fieldType) { + checkArgument( + fieldType.isAnyOf(getSupportedDataTypeRoots()), + "Data type for %s column must be part of %s but was '%s'.", + toString(), + Arrays.deepToString(getSupportedDataTypeRoots()), + fieldType); + } + /** * Converts a string to an AggFunctionType enum value. * diff --git a/fluss-common/src/main/java/org/apache/fluss/metadata/AggFunctions.java b/fluss-common/src/main/java/org/apache/fluss/metadata/AggFunctions.java index 3fb647158..86a704ef6 100644 --- a/fluss-common/src/main/java/org/apache/fluss/metadata/AggFunctions.java +++ b/fluss-common/src/main/java/org/apache/fluss/metadata/AggFunctions.java @@ -171,6 +171,9 @@ public final class AggFunctions { */ public static final String DEFAULT_LISTAGG_DELIMITER = ","; + /** Parameter name for delimiter used in LISTAGG and STRING_AGG functions. */ + public static final String PARAM_DELIMITER = "delimiter"; + /** * Creates a LISTAGG aggregation function with default comma delimiter. * diff --git a/fluss-common/src/test/java/org/apache/fluss/metadata/TableDescriptorTest.java b/fluss-common/src/test/java/org/apache/fluss/metadata/TableDescriptorTest.java index 430d47d90..bd9961075 100644 --- a/fluss-common/src/test/java/org/apache/fluss/metadata/TableDescriptorTest.java +++ b/fluss-common/src/test/java/org/apache/fluss/metadata/TableDescriptorTest.java @@ -23,6 +23,7 @@ import org.apache.fluss.types.DataTypes; import org.junit.jupiter.api.Test; +import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.Map; @@ -321,7 +322,7 @@ class TableDescriptorTest { @Test void testInvalidListaggParameterEmptyDelimiter() { // LISTAGG with empty delimiter - should fail - assertThatThrownBy(() -> AggFunctions.LISTAGG("").validate()) + assertThatThrownBy(() -> AggFunctions.LISTAGG("").validateParameters()) .isInstanceOf(IllegalArgumentException.class) .hasMessageContaining("must be a non-empty string"); } @@ -332,7 +333,8 @@ class TableDescriptorTest { Map<String, String> params = new HashMap<>(); params.put("unknown_param", "value"); - assertThatThrownBy(() -> AggFunctions.of(AggFunctionType.LISTAGG, params).validate()) + assertThatThrownBy( + () -> AggFunctions.of(AggFunctionType.LISTAGG, params).validateParameters()) .isInstanceOf(IllegalArgumentException.class) .hasMessageContaining("unknown_param") .hasMessageContaining("not supported"); @@ -344,9 +346,46 @@ class TableDescriptorTest { Map<String, String> params = new HashMap<>(); params.put("some_param", "value"); - assertThatThrownBy(() -> AggFunctions.of(AggFunctionType.SUM, params).validate()) + assertThatThrownBy(() -> AggFunctions.of(AggFunctionType.SUM, params).validateParameters()) .isInstanceOf(IllegalArgumentException.class) .hasMessageContaining("some_param") .hasMessageContaining("not supported"); } + + @Test + void testValidateAggFunctionWithDataType() { + Map<String, String> params = new HashMap<>(); + + // invalid case + assertThatThrownBy( + () -> + AggFunctions.of(AggFunctionType.BOOL_AND, params) + .validateDataType(DataTypes.STRING())) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("column must be part of") + .hasMessageContaining( + Arrays.deepToString(AggFunctionType.BOOL_AND.getSupportedDataTypeRoots())); + + assertThatThrownBy( + () -> + AggFunctions.of(AggFunctionType.SUM, params) + .validateDataType(DataTypes.STRING())) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("column must be part of") + .hasMessageContaining( + Arrays.deepToString(AggFunctionType.SUM.getSupportedDataTypeRoots())); + + assertThatThrownBy( + () -> + AggFunctions.of(AggFunctionType.MAX, params) + .validateDataType(DataTypes.BOOLEAN())) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("column must be part of") + .hasMessageContaining( + Arrays.deepToString(AggFunctionType.MAX.getSupportedDataTypeRoots())); + + // valid case + AggFunctions.of(AggFunctionType.LAST_VALUE, params).validateDataType(DataTypes.STRING()); + AggFunctions.of(AggFunctionType.LISTAGG, params).validateDataType(DataTypes.STRING()); + } } diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/FlinkAggFunctionParser.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/FlinkAggFunctionParser.java index a781a75d5..ad1785a98 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/FlinkAggFunctionParser.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/FlinkAggFunctionParser.java @@ -21,6 +21,7 @@ import org.apache.fluss.annotation.VisibleForTesting; import org.apache.fluss.metadata.AggFunction; import org.apache.fluss.metadata.AggFunctionType; import org.apache.fluss.metadata.AggFunctions; +import org.apache.fluss.types.DataType; import org.apache.flink.configuration.Configuration; @@ -59,7 +60,8 @@ public class FlinkAggFunctionParser { * * <p>Returns empty if no aggregation function is configured for this column. */ - public static Optional<AggFunction> parseAggFunction(String columnName, Configuration options) { + public static Optional<AggFunction> parseAggFunction( + String columnName, DataType dataType, Configuration options) { // Check column-level configuration: fields.<column>.agg String columnFuncKey = AGG_PREFIX + columnName + AGG_SUFFIX; @@ -75,7 +77,7 @@ public class FlinkAggFunctionParser { Map<String, String> params = collectParameters(AGG_PREFIX + columnName + "." + funcName + ".", options); - return Optional.of(createAggFunction(type, params, columnName)); + return Optional.of(createAggFunction(type, params, columnName, dataType)); } /** @@ -182,10 +184,14 @@ public class FlinkAggFunctionParser { * @param type the aggregation function type * @param params the function parameters (may be empty) * @param columnName the column name (used for error messages) + * @param dataType the column data type (use for validate) * @return the created aggregation function */ private static AggFunction createAggFunction( - AggFunctionType type, Map<String, String> params, String columnName) { + AggFunctionType type, + Map<String, String> params, + String columnName, + DataType dataType) { // Use generic factory method to create aggregation function // This delegates parameter handling and validation to the underlying implementation AggFunction aggFunction = @@ -193,7 +199,8 @@ public class FlinkAggFunctionParser { // Validate all parameters to ensure they are supported and valid try { - aggFunction.validate(); + aggFunction.validateParameters(); + aggFunction.validateDataType(dataType); } catch (IllegalArgumentException e) { throw new IllegalArgumentException( String.format( diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/FlinkConversions.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/FlinkConversions.java index f2d34dc14..2c908e128 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/FlinkConversions.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/FlinkConversions.java @@ -691,7 +691,7 @@ public class FlinkConversions { // Parse and add aggregation function if needed if (parseAggFunction) { Optional<AggFunction> aggFunction = - FlinkAggFunctionParser.parseAggFunction(columnName, tableConf); + FlinkAggFunctionParser.parseAggFunction(columnName, flussDataType, tableConf); if (aggFunction.isPresent()) { schemaBuilder.column(columnName, flussDataType, aggFunction.get()); } else { diff --git a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/utils/FlinkAggFunctionParserTest.java b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/utils/FlinkAggFunctionParserTest.java index 62e861da7..fe019a08a 100644 --- a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/utils/FlinkAggFunctionParserTest.java +++ b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/utils/FlinkAggFunctionParserTest.java @@ -20,6 +20,7 @@ package org.apache.fluss.flink.utils; import org.apache.fluss.metadata.AggFunction; import org.apache.fluss.metadata.AggFunctionType; import org.apache.fluss.metadata.AggFunctions; +import org.apache.fluss.types.DataTypes; import org.apache.flink.configuration.Configuration; import org.junit.jupiter.api.Test; @@ -37,7 +38,8 @@ class FlinkAggFunctionParserTest { @Test void testParseNoAggFunction() { Configuration options = new Configuration(); - assertThat(FlinkAggFunctionParser.parseAggFunction("total", options)).isEmpty(); + assertThat(FlinkAggFunctionParser.parseAggFunction("total", DataTypes.INT(), options)) + .isEmpty(); } @Test @@ -45,7 +47,8 @@ class FlinkAggFunctionParserTest { Configuration options = new Configuration(); options.setString("fields.total.agg", "sum"); - Optional<AggFunction> result = FlinkAggFunctionParser.parseAggFunction("total", options); + Optional<AggFunction> result = + FlinkAggFunctionParser.parseAggFunction("total", DataTypes.INT(), options); assertThat(result).isPresent(); assertThat(result.get().getType()).isEqualTo(AggFunctionType.SUM); @@ -58,7 +61,8 @@ class FlinkAggFunctionParserTest { options.setString("fields.tags.agg", "listagg"); options.setString("fields.tags.listagg.delimiter", ";"); - Optional<AggFunction> result = FlinkAggFunctionParser.parseAggFunction("tags", options); + Optional<AggFunction> result = + FlinkAggFunctionParser.parseAggFunction("tags", DataTypes.STRING(), options); assertThat(result).isPresent(); assertThat(result.get().getType()).isEqualTo(AggFunctionType.LISTAGG); @@ -76,7 +80,8 @@ class FlinkAggFunctionParserTest { options.setString("fields.tags.agg", "LISTAGG"); options.setString("fields.tags.listagg.delimiter", ";"); - Optional<AggFunction> result = FlinkAggFunctionParser.parseAggFunction("tags", options); + Optional<AggFunction> result = + FlinkAggFunctionParser.parseAggFunction("tags", DataTypes.STRING(), options); assertThat(result).isPresent(); assertThat(result.get().getType()).isEqualTo(AggFunctionType.LISTAGG); @@ -91,9 +96,12 @@ class FlinkAggFunctionParserTest { options.setString("fields.col2.agg", "listagg"); options.setString("fields.col2.listagg.delimiter", "|"); // This should not affect col1 - Optional<AggFunction> col1Func = FlinkAggFunctionParser.parseAggFunction("col1", options); - Optional<AggFunction> col2Func = FlinkAggFunctionParser.parseAggFunction("col2", options); - Optional<AggFunction> col3Func = FlinkAggFunctionParser.parseAggFunction("col3", options); + Optional<AggFunction> col1Func = + FlinkAggFunctionParser.parseAggFunction("col1", DataTypes.INT(), options); + Optional<AggFunction> col2Func = + FlinkAggFunctionParser.parseAggFunction("col2", DataTypes.STRING(), options); + Optional<AggFunction> col3Func = + FlinkAggFunctionParser.parseAggFunction("col3", DataTypes.STRING(), options); // col1 should have SUM without parameters assertThat(col1Func).isPresent(); @@ -114,7 +122,10 @@ class FlinkAggFunctionParserTest { Configuration options = new Configuration(); options.setString("fields.total.agg", "invalid_function"); - assertThatThrownBy(() -> FlinkAggFunctionParser.parseAggFunction("total", options)) + assertThatThrownBy( + () -> + FlinkAggFunctionParser.parseAggFunction( + "total", DataTypes.STRING(), options)) .isInstanceOf(IllegalArgumentException.class) .hasMessageContaining("Unknown aggregation function") .hasMessageContaining("invalid_function"); @@ -125,7 +136,10 @@ class FlinkAggFunctionParserTest { Configuration options = new Configuration(); options.setString("fields.total.agg", ""); - assertThatThrownBy(() -> FlinkAggFunctionParser.parseAggFunction("total", options)) + assertThatThrownBy( + () -> + FlinkAggFunctionParser.parseAggFunction( + "total", DataTypes.STRING(), options)) .isInstanceOf(IllegalArgumentException.class) .hasMessageContaining("Empty aggregation function name"); } @@ -142,8 +156,10 @@ class FlinkAggFunctionParserTest { // Parse them back Configuration config = Configuration.fromMap(options); - Optional<AggFunction> col1Func = FlinkAggFunctionParser.parseAggFunction("col1", config); - Optional<AggFunction> col2Func = FlinkAggFunctionParser.parseAggFunction("col2", config); + Optional<AggFunction> col1Func = + FlinkAggFunctionParser.parseAggFunction("col1", DataTypes.INT(), config); + Optional<AggFunction> col2Func = + FlinkAggFunctionParser.parseAggFunction("col2", DataTypes.STRING(), config); // Verify they match the original functions assertThat(col1Func).isPresent(); diff --git a/fluss-server/src/main/java/org/apache/fluss/server/kv/rowmerger/aggregate/factory/FieldListaggAggFactory.java b/fluss-server/src/main/java/org/apache/fluss/server/kv/rowmerger/aggregate/factory/FieldListaggAggFactory.java index aeb4b01d6..5c1a42b3f 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/kv/rowmerger/aggregate/factory/FieldListaggAggFactory.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/kv/rowmerger/aggregate/factory/FieldListaggAggFactory.java @@ -42,7 +42,7 @@ public class FieldListaggAggFactory implements FieldAggregatorFactory { fieldType); // Get delimiter from function parameters, default to comma - String delimiter = aggFunction.getParameter(AggFunctionType.PARAM_DELIMITER); + String delimiter = aggFunction.getParameter(AggFunctions.PARAM_DELIMITER); if (delimiter == null) { delimiter = AggFunctions.DEFAULT_LISTAGG_DELIMITER; } diff --git a/fluss-server/src/main/java/org/apache/fluss/server/kv/rowmerger/aggregate/factory/FieldStringAggFactory.java b/fluss-server/src/main/java/org/apache/fluss/server/kv/rowmerger/aggregate/factory/FieldStringAggFactory.java index 784776f36..791bbba75 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/kv/rowmerger/aggregate/factory/FieldStringAggFactory.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/kv/rowmerger/aggregate/factory/FieldStringAggFactory.java @@ -48,7 +48,7 @@ public class FieldStringAggFactory implements FieldAggregatorFactory { fieldType); // Get delimiter from function parameters, default to comma - String delimiter = aggFunction.getParameter(AggFunctionType.PARAM_DELIMITER); + String delimiter = aggFunction.getParameter(AggFunctions.PARAM_DELIMITER); if (delimiter == null) { delimiter = AggFunctions.DEFAULT_LISTAGG_DELIMITER; } diff --git a/fluss-server/src/main/java/org/apache/fluss/server/utils/TableDescriptorValidation.java b/fluss-server/src/main/java/org/apache/fluss/server/utils/TableDescriptorValidation.java index 760ad50ae..0c8e9ef35 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/utils/TableDescriptorValidation.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/utils/TableDescriptorValidation.java @@ -322,11 +322,13 @@ public class TableDescriptorValidation { /** * Validates aggregation function parameters in the schema. * - * <p>This method delegates to {@link AggFunction#validate()} to ensure all parameters are valid + * <p>This method delegates to {@link AggFunction#validateParameters()} to ensure all parameters + * are valid and {@link AggFunction#validateDataType(DataType)} to ensure data type are valid * according to the function's requirements. * * @param schema the schema to validate - * @throws InvalidConfigException if any aggregation function has invalid parameters + * @throws InvalidConfigException if any aggregation function has invalid parameters or data + * types */ private static void validateAggregationFunctionParameters(Schema schema) { // Get primary key columns for early exit @@ -343,9 +345,11 @@ public class TableDescriptorValidation { continue; } - // Validate aggregation function parameters + // Validate aggregation function parameters and data type try { - aggFunctionOpt.get().validate(); + AggFunction aggFunction = aggFunctionOpt.get(); + aggFunction.validateParameters(); + aggFunction.validateDataType(column.getDataType()); } catch (IllegalArgumentException e) { throw new InvalidConfigException( String.format( diff --git a/website/docs/table-design/merge-engines/aggregation.md b/website/docs/table-design/merge-engines/aggregation.md index 4523e830a..7d585f210 100644 --- a/website/docs/table-design/merge-engines/aggregation.md +++ b/website/docs/table-design/merge-engines/aggregation.md @@ -265,7 +265,7 @@ TableDescriptor.builder() Computes the product of values across multiple rows. -- **Supported Data Types**: TINYINT, SMALLINT, INT, BIGINT, FLOAT, DOUBLE, DECIMAL +- **Supported Data Types**: `TINYINT`, `SMALLINT`, `INT`, `BIGINT`, `FLOAT`, `DOUBLE`, `DECIMAL` - **Behavior**: Multiplies incoming values with the accumulator - **Null Handling**: Null values are ignored @@ -321,7 +321,7 @@ TableDescriptor.builder() Identifies and retains the maximum value. -- **Supported Data Types**: CHAR, STRING, TINYINT, SMALLINT, INT, BIGINT, FLOAT, DOUBLE, DECIMAL, DATE, TIME, TIMESTAMP, TIMESTAMP_LTZ +- **Supported Data Types**: `CHAR`, `STRING`, `TINYINT`, `SMALLINT`, `INT`, `BIGINT`, `FLOAT`, `DOUBLE`, `DECIMAL`, `DATE`, `TIME`, `TIMESTAMP`, `TIMESTAMP_LTZ` - **Behavior**: Keeps the larger value between accumulator and incoming value - **Null Handling**: Null values are ignored @@ -379,7 +379,7 @@ TableDescriptor.builder() Identifies and retains the minimum value. -- **Supported Data Types**: CHAR, STRING, TINYINT, SMALLINT, INT, BIGINT, FLOAT, DOUBLE, DECIMAL, DATE, TIME, TIMESTAMP, TIMESTAMP_LTZ +- **Supported Data Types**: `CHAR`, `STRING`, `TINYINT`, `SMALLINT`, `INT`, `BIGINT`, `FLOAT`, `DOUBLE`, `DECIMAL`, `DATE`, `TIME`, `TIMESTAMP`, `TIMESTAMP_LTZ` - **Behavior**: Keeps the smaller value between accumulator and incoming value - **Null Handling**: Null values are ignored @@ -702,7 +702,7 @@ TableDescriptor.builder() Concatenates multiple string values into a single string with a delimiter. -- **Supported Data Types**: STRING, CHAR +- **Supported Data Types**: `STRING`, `CHAR` - **Behavior**: Concatenates values using the specified delimiter - **Null Handling**: Null values are skipped - **Delimiter**: Specify delimiter directly in the aggregation function (default is comma `,`) @@ -764,7 +764,7 @@ TableDescriptor.builder() Alias for `listagg`. Concatenates multiple string values into a single string with a delimiter. -- **Supported Data Types**: STRING, CHAR +- **Supported Data Types**: `STRING`, `CHAR` - **Behavior**: Same as `listagg` - concatenates values using the specified delimiter - **Null Handling**: Null values are skipped - **Delimiter**: Specify delimiter directly in the aggregation function (default is comma `,`) @@ -831,7 +831,7 @@ TableDescriptor.builder() Aggregates serialized 32-bit RoaringBitmap values by union. -- **Supported Data Types**: BYTES +- **Supported Data Types**: `BYTES` - **Behavior**: ORs incoming bitmaps with the accumulator - **Null Handling**: Null values are ignored @@ -868,7 +868,7 @@ Schema schema = Schema.newBuilder() Aggregates serialized 64-bit RoaringBitmap values by union. -- **Supported Data Types**: BYTES +- **Supported Data Types**: `BYTES` - **Behavior**: ORs incoming bitmaps with the accumulator - **Null Handling**: Null values are ignored @@ -905,7 +905,7 @@ Schema schema = Schema.newBuilder() Evaluates whether all boolean values in a set are true (logical AND). -- **Supported Data Types**: BOOLEAN +- **Supported Data Types**: `BOOLEAN` - **Behavior**: Returns true only if all values are true - **Null Handling**: Null values are ignored @@ -962,7 +962,7 @@ TableDescriptor.builder() Checks if at least one boolean value in a set is true (logical OR). -- **Supported Data Types**: BOOLEAN +- **Supported Data Types**: `BOOLEAN` - **Behavior**: Returns true if any value is true - **Null Handling**: Null values are ignored
