This is an automated email from the ASF dual-hosted git repository.
jark pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git
The following commit(s) were added to refs/heads/main by this push:
new d10cb3590 [flink] Flink integration with Aggregation Merge Engine
(#2307)
d10cb3590 is described below
commit d10cb3590aeeac00f1480ba879d09c4526b65a45
Author: Yang Wang <[email protected]>
AuthorDate: Tue Jan 20 16:38:09 2026 +0800
[flink] Flink integration with Aggregation Merge Engine (#2307)
---
.../fluss/flink/catalog/FlinkTableFactory.java | 7 +-
.../fluss/flink/utils/FlinkAggFunctionParser.java | 207 +++++++++++++++++++++
.../apache/fluss/flink/utils/FlinkConversions.java | 72 ++++++-
.../fluss/flink/sink/FlinkTableSinkITCase.java | 130 +++++++++++++
.../flink/utils/FlinkAggFunctionParserTest.java | 155 +++++++++++++++
.../fluss/flink/utils/FlinkConversionsTest.java | 49 +++++
.../docs/table-design/merge-engines/aggregation.md | 11 +-
7 files changed, 618 insertions(+), 13 deletions(-)
diff --git
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkTableFactory.java
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkTableFactory.java
index e50522c45..48d74e712 100644
---
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkTableFactory.java
+++
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkTableFactory.java
@@ -93,7 +93,8 @@ public class FlinkTableFactory implements
DynamicTableSourceFactory, DynamicTabl
FactoryUtil.TableFactoryHelper helper =
FactoryUtil.createTableFactoryHelper(this, context);
final ReadableConfig tableOptions = helper.getOptions();
Optional<DataLakeFormat> datalakeFormat =
getDatalakeFormat(tableOptions);
- List<String> prefixesToSkip = new ArrayList<>(Arrays.asList("table.",
"client."));
+ List<String> prefixesToSkip =
+ new ArrayList<>(Arrays.asList("table.", "client.", "fields."));
datalakeFormat.ifPresent(dataLakeFormat ->
prefixesToSkip.add(dataLakeFormat + "."));
helper.validateExcept(prefixesToSkip.toArray(new String[0]));
@@ -161,9 +162,9 @@ public class FlinkTableFactory implements
DynamicTableSourceFactory, DynamicTabl
final ReadableConfig tableOptions = helper.getOptions();
Optional<DataLakeFormat> datalakeFormat =
getDatalakeFormat(tableOptions);
if (datalakeFormat.isPresent()) {
- helper.validateExcept("table.", "client.", datalakeFormat.get() +
".");
+ helper.validateExcept("table.", "client.", "fields.",
datalakeFormat.get() + ".");
} else {
- helper.validateExcept("table.", "client.");
+ helper.validateExcept("table.", "client.", "fields.");
}
boolean isStreamingMode =
diff --git
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/FlinkAggFunctionParser.java
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/FlinkAggFunctionParser.java
new file mode 100644
index 000000000..a781a75d5
--- /dev/null
+++
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/FlinkAggFunctionParser.java
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.flink.utils;
+
+import org.apache.fluss.annotation.VisibleForTesting;
+import org.apache.fluss.metadata.AggFunction;
+import org.apache.fluss.metadata.AggFunctionType;
+import org.apache.fluss.metadata.AggFunctions;
+
+import org.apache.flink.configuration.Configuration;
+
+import javax.annotation.Nullable;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.stream.Collectors;
+
+/**
+ * Parser for aggregation function configuration from Flink table properties.
+ *
+ * <p>Configuration format (column-level only):
+ *
+ * <pre>
+ * fields.<column>.agg = <function_name>
+ * fields.<column>.<function_name>.<param_name> =
<param_value>
+ * </pre>
+ *
+ * <p>Examples:
+ *
+ * <pre>
+ * 'fields.tags.agg' = 'listagg'
+ * 'fields.tags.listagg.delimiter' = ';'
+ * </pre>
+ */
+public class FlinkAggFunctionParser {
+
+ private static final String AGG_PREFIX = "fields.";
+ private static final String AGG_SUFFIX = ".agg";
+
+ /**
+ * Parse aggregation function for a column.
+ *
+ * <p>Returns empty if no aggregation function is configured for this
column.
+ */
+ public static Optional<AggFunction> parseAggFunction(String columnName,
Configuration options) {
+
+ // Check column-level configuration: fields.<column>.agg
+ String columnFuncKey = AGG_PREFIX + columnName + AGG_SUFFIX;
+ if (!options.containsKey(columnFuncKey)) {
+ return Optional.empty();
+ }
+
+ // convert to lower case for consistent parameter option key
+ String funcName = options.getString(columnFuncKey, null).toLowerCase();
+ AggFunctionType type = parseAggFunctionType(funcName, columnName);
+
+ // Collect column-level parameters: fields.<column>.<function_name>.*
+ Map<String, String> params =
+ collectParameters(AGG_PREFIX + columnName + "." + funcName +
".", options);
+
+ return Optional.of(createAggFunction(type, params, columnName));
+ }
+
+ /**
+ * Format aggregation function to Flink table options.
+ *
+ * <p>Converts: AggFunction → Map<String, String>
+ *
+ * <p>Output format:
+ *
+ * <pre>
+ * fields.<column>.agg = <function_name>
+ * fields.<column>.<function_name>.<param> =
<value>
+ * </pre>
+ */
+ public static void formatAggFunctionToOptions(
+ String columnName, AggFunction aggFunction, Map<String, String>
options) {
+ // Set function name
+ String funcKey = AGG_PREFIX + columnName + AGG_SUFFIX;
+ // funcName has already in lower case
+ String funcName = aggFunction.getType().toString();
+ options.put(funcKey, funcName);
+
+ // Set parameters: fields.<column>.<function_name>.<param>
+ for (Map.Entry<String, String> param :
aggFunction.getParameters().entrySet()) {
+ String paramKey = AGG_PREFIX + columnName + "." + funcName + "." +
param.getKey();
+ options.put(paramKey, param.getValue());
+ }
+ }
+
+ /**
+ * Parse aggregation function type from string.
+ *
+ * @throws IllegalArgumentException if function name is invalid
+ */
+ @VisibleForTesting
+ static AggFunctionType parseAggFunctionType(@Nullable String funcName,
String columnName) {
+ if (funcName == null || funcName.trim().isEmpty()) {
+ throw new IllegalArgumentException(
+ String.format("Empty aggregation function name for column
'%s'", columnName));
+ }
+
+ AggFunctionType type = AggFunctionType.fromString(funcName.trim());
+ if (type == null) {
+ throw new IllegalArgumentException(
+ String.format(
+ "Unknown aggregation function for column '%s':
%s\n"
+ + "Supported functions: %s",
+ columnName, funcName,
getSupportedFunctionNames()));
+ }
+
+ return type;
+ }
+
+ /**
+ * Get a comma-separated list of all supported aggregation function names.
+ *
+ * <p>This method dynamically generates the list from the AggFunctionType
enum, ensuring that
+ * the error message automatically includes any newly added functions
without requiring manual
+ * updates.
+ *
+ * @return comma-separated list of supported function names
+ */
+ private static String getSupportedFunctionNames() {
+ return Arrays.stream(AggFunctionType.values())
+ .map(AggFunctionType::toString)
+ .sorted()
+ .collect(Collectors.joining(", "));
+ }
+
+ /**
+ * Collect parameters with given prefix.
+ *
+ * <p>For prefix "fields.tags.listagg.", collects:
+ *
+ * <ul>
+ * <li>fields.tags.listagg.delimiter → delimiter
+ * <li>fields.tags.listagg.xxx → xxx
+ * </ul>
+ */
+ @VisibleForTesting
+ static Map<String, String> collectParameters(String prefix, Configuration
options) {
+ Map<String, String> params = new HashMap<>();
+
+ for (String key : options.keySet()) {
+ if (key.startsWith(prefix)) {
+ String paramName = key.substring(prefix.length());
+ String paramValue = options.getString(key, null);
+ if (paramValue != null) {
+ params.put(paramName, paramValue);
+ }
+ }
+ }
+
+ return params;
+ }
+
+ /**
+ * Create AggFunction from type and parameters.
+ *
+ * <p>This method uses the generic factory method from {@link
AggFunctions} to create
+ * aggregation functions, making it independent of specific function
implementations. When new
+ * aggregation functions are added to the system, no changes are required
here.
+ *
+ * @param type the aggregation function type
+ * @param params the function parameters (may be empty)
+ * @param columnName the column name (used for error messages)
+ * @return the created aggregation function
+ */
+ private static AggFunction createAggFunction(
+ AggFunctionType type, Map<String, String> params, String
columnName) {
+ // Use generic factory method to create aggregation function
+ // This delegates parameter handling and validation to the underlying
implementation
+ AggFunction aggFunction =
+ params.isEmpty() ? AggFunctions.of(type) :
AggFunctions.of(type, params);
+
+ // Validate all parameters to ensure they are supported and valid
+ try {
+ aggFunction.validate();
+ } catch (IllegalArgumentException e) {
+ throw new IllegalArgumentException(
+ String.format(
+ "Invalid aggregation function configuration for
column '%s': %s",
+ columnName, e.getMessage()),
+ e);
+ }
+
+ return aggFunction;
+ }
+}
diff --git
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/FlinkConversions.java
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/FlinkConversions.java
index 5fc721c40..f2d34dc14 100644
---
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/FlinkConversions.java
+++
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/FlinkConversions.java
@@ -23,7 +23,9 @@ import org.apache.fluss.config.MemorySize;
import org.apache.fluss.config.Password;
import org.apache.fluss.flink.adapter.CatalogTableAdapter;
import org.apache.fluss.flink.catalog.FlinkCatalogFactory;
+import org.apache.fluss.metadata.AggFunction;
import org.apache.fluss.metadata.DatabaseDescriptor;
+import org.apache.fluss.metadata.MergeEngineType;
import org.apache.fluss.metadata.Schema;
import org.apache.fluss.metadata.TableChange;
import org.apache.fluss.metadata.TableDescriptor;
@@ -60,6 +62,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
+import java.util.Optional;
import java.util.stream.Collectors;
import static org.apache.flink.table.factories.FactoryUtil.CONNECTOR;
@@ -67,6 +70,7 @@ import static
org.apache.flink.table.utils.EncodingUtils.decodeBase64ToBytes;
import static org.apache.flink.table.utils.EncodingUtils.encodeBytesToBase64;
import static org.apache.flink.util.Preconditions.checkNotNull;
import static org.apache.flink.util.Preconditions.checkState;
+import static org.apache.fluss.config.ConfigOptions.TABLE_MERGE_ENGINE;
import static org.apache.fluss.config.FlussConfigUtils.isTableStorageConfig;
import static
org.apache.fluss.flink.FlinkConnectorOptions.AUTO_INCREMENT_FIELDS;
import static org.apache.fluss.flink.FlinkConnectorOptions.BUCKET_KEY;
@@ -127,6 +131,14 @@ public class FlinkConversions {
}
Schema schema = tableInfo.getSchema();
+
+ // convert aggregation functions to flink options
+ for (Schema.Column column : schema.getColumns()) {
+ if (column.getAggFunction().isPresent()) {
+ FlinkAggFunctionParser.formatAggFunctionToOptions(
+ column.getName(), column.getAggFunction().get(),
newOptions);
+ }
+ }
List<String> physicalColumns = schema.getColumnNames();
int columnCount =
physicalColumns.size()
@@ -197,17 +209,16 @@ public class FlinkConversions {
schemBuilder.primaryKey(resolvedSchema.getPrimaryKey().get().getColumns());
}
- // first build schema with physical columns
+ // Check if aggregation merge engine is enabled to optimize parsing
+ boolean isAggregationEngine = isAggregationMergeEngine(flinkTableConf);
+
+ // Build schema with physical columns
resolvedSchema.getColumns().stream()
.filter(Column::isPhysical)
.forEachOrdered(
- column -> {
- schemBuilder
- .column(
- column.getName(),
-
FlinkConversions.toFlussType(column.getDataType()))
-
.withComment(column.getComment().orElse(null));
- });
+ column ->
+ addColumnToSchema(
+ schemBuilder, column, flinkTableConf,
isAggregationEngine));
// Configure auto-increment columns based on the
'auto-increment.fields' option.
if (flinkTableConf.containsKey(AUTO_INCREMENT_FIELDS.key())) {
@@ -649,6 +660,51 @@ public class FlinkConversions {
return builder.build();
}
+ /**
+ * Check if the table uses aggregation merge engine.
+ *
+ * @param tableConf the table configuration
+ * @return true if aggregation merge engine is enabled, false otherwise
+ */
+ private static boolean isAggregationMergeEngine(Configuration tableConf) {
+ String mergeEngineStr = tableConf.getString(TABLE_MERGE_ENGINE.key(),
null);
+ return mergeEngineStr != null
+ && MergeEngineType.fromString(mergeEngineStr) ==
MergeEngineType.AGGREGATION;
+ }
+
+ /**
+ * Add a column to the schema builder with optional aggregation function.
+ *
+ * @param schemaBuilder the schema builder
+ * @param column the Flink column
+ * @param tableConf the table configuration
+ * @param parseAggFunction whether to parse aggregation function from
config
+ */
+ private static void addColumnToSchema(
+ Schema.Builder schemaBuilder,
+ Column column,
+ Configuration tableConf,
+ boolean parseAggFunction) {
+ String columnName = column.getName();
+ DataType flussDataType = toFlussType(column.getDataType());
+
+ // Parse and add aggregation function if needed
+ if (parseAggFunction) {
+ Optional<AggFunction> aggFunction =
+ FlinkAggFunctionParser.parseAggFunction(columnName,
tableConf);
+ if (aggFunction.isPresent()) {
+ schemaBuilder.column(columnName, flussDataType,
aggFunction.get());
+ } else {
+ schemaBuilder.column(columnName, flussDataType);
+ }
+ } else {
+ schemaBuilder.column(columnName, flussDataType);
+ }
+
+ // Add comment if present
+ column.getComment().ifPresent(schemaBuilder::withComment);
+ }
+
private static Map<String, String> extractCustomProperties(
Configuration allProperties, Map<String, String>
flussTableProperties) {
Map<String, String> customProperties = new
HashMap<>(allProperties.toMap());
diff --git
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/sink/FlinkTableSinkITCase.java
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/sink/FlinkTableSinkITCase.java
index 1af582e4d..f342379b1 100644
---
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/sink/FlinkTableSinkITCase.java
+++
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/sink/FlinkTableSinkITCase.java
@@ -1307,6 +1307,136 @@ abstract class FlinkTableSinkITCase extends
AbstractTestBase {
assertResultsIgnoreOrder(rowIter, expectedRows, true);
}
+ @Test
+ void testComprehensiveAggregationFunctions() throws Exception {
+ // Test all 11 aggregate functions (each function tested once with
representative data type)
+ tEnv.executeSql(
+ "create table comprehensive_agg ("
+ + "id int not null primary key not enforced, "
+ // Numeric aggregations
+ + "sum_int int, "
+ + "sum_double double, "
+ + "product_int int, "
+ // Max/Min aggregations (representative types: int,
double, string,
+ // timestamp)
+ + "max_int int, "
+ + "max_timestamp timestamp(3), "
+ + "min_double double, "
+ + "min_string string, "
+ // Value selection aggregations (test with/without
nulls)
+ + "first_val int, "
+ + "first_val_non_null int, "
+ + "last_val int, "
+ + "last_val_non_null int, "
+ // Boolean aggregations
+ + "bool_and_val boolean, "
+ + "bool_or_val boolean, "
+ // String aggregation with custom delimiter
+ + "listagg_val string"
+ + ") with ("
+ + "'table.merge-engine' = 'aggregation', "
+ + "'fields.sum_int.agg' = 'sum', "
+ + "'fields.sum_double.agg' = 'sum', "
+ + "'fields.product_int.agg' = 'product', "
+ + "'fields.max_int.agg' = 'max', "
+ + "'fields.max_timestamp.agg' = 'max', "
+ + "'fields.min_double.agg' = 'min', "
+ + "'fields.min_string.agg' = 'min', "
+ + "'fields.first_val.agg' = 'first_value', "
+ + "'fields.first_val_non_null.agg' =
'first_value_ignore_nulls', "
+ + "'fields.last_val.agg' = 'last_value', "
+ + "'fields.last_val_non_null.agg' =
'last_value_ignore_nulls', "
+ + "'fields.bool_and_val.agg' = 'bool_and', "
+ + "'fields.bool_or_val.agg' = 'bool_or', "
+ + "'fields.listagg_val.agg' = 'listagg', "
+ + "'fields.listagg_val.listagg.delimiter' = '|')");
+
+ // Insert first batch - initial values
+ tEnv.executeSql(
+ "INSERT INTO comprehensive_agg VALUES ("
+ + "1, " // id
+ + "1000, 10.5, " // sum_int, sum_double
+ + "2, " // product_int
+ + "100, TIMESTAMP '2024-01-15 15:00:00', " //
max_int, max_timestamp
+ + "100.0, 'beta', " // min_double, min_string
+ + "100, 100, 100, 100, " // first_value,
first_value_ignore_nulls,
+ // last_value, last_value_ignore_nulls
+ + "true, false, " // bool_and_val, bool_or_val
+ + "'alpha'" // listagg_val
+ + ")")
+ .await();
+
+ // Insert second batch - trigger aggregation
+ tEnv.executeSql(
+ "INSERT INTO comprehensive_agg VALUES ("
+ + "1, " // id
+ + "2000, 20.5, " // sum: 1000+2000=3000,
10.5+20.5=31.0
+ + "3, " // product: 2*3=6
+ + "200, TIMESTAMP '2024-02-01 18:00:00', " //
max: 200, 2024-02-01
+ // 18:00
+ + "50.0, 'alpha', " // min: 50.0, alpha
+ + "200, 200, 200, 200, " // first: keep 100,
first_ignore_nulls:
+ // 200, last: 200, last_ignore_nulls: 200
+ + "true, true, " // bool_and: true AND
true=true, bool_or: false OR
+ // true=true
+ + "'beta'" // listagg: alpha|beta
+ + ")")
+ .await();
+
+ // Insert third batch - further aggregation with null handling test
+ tEnv.executeSql(
+ "INSERT INTO comprehensive_agg VALUES ("
+ + "1, " // id
+ + "3000, 30.5, " // sum: 3000+3000=6000,
31.0+30.5=61.5
+ + "5, " // product: 6*5=30
+ + "150, TIMESTAMP '2024-01-20 14:00:00', " //
max: keep 200, keep
+ // 2024-02-01 18:00
+ + "80.0, 'charlie', " // min: keep 50.0, keep
alpha
+ + "300, CAST(NULL AS INT), 300, 300, " //
first: keep 100, ignore
+ // null keep 200, last: 300,
+ // last_ignore_nulls: 300
+ + "false, true, " // bool_and: true AND
false=false, bool_or: true
+ // OR true=true
+ + "'gamma'" // listagg: alpha|beta|gamma
+ + ")")
+ .await();
+
+ // Query and verify aggregated results
+ CloseableIterator<Row> rowIter =
+ tEnv.executeSql("SELECT * FROM comprehensive_agg").collect();
+
+ // Expected results: changelog with 5 records (+I, -U, +U, -U, +U)
+ List<String> expectedRows =
+ Arrays.asList(
+ // First insert: initial values
+ "+I[1, 1000, 10.5, 2, 100, 2024-01-15T15:00, 100.0,
beta, 100, 100, 100, 100, true, false, alpha]",
+ // Second insert: retraction
+ "-U[1, 1000, 10.5, 2, 100, 2024-01-15T15:00, 100.0,
beta, 100, 100, 100, 100, true, false, alpha]",
+ // Second insert: aggregated result
+ // sum: 1000+2000=3000, 10.5+20.5=31.0
+ // product: 2*3=6
+ // max: 200, 2024-02-01T18:00
+ // min: 50.0, alpha
+ // first: 100, first_non_null: 200, last: 200,
last_non_null: 200
+ // bool_and: true, bool_or: true
+ // listagg: alpha|beta
+ "+U[1, 3000, 31.0, 6, 200, 2024-02-01T18:00, 50.0,
alpha, 100, 100, 200, 200, true, true, alpha|beta]",
+ // Third insert: retraction
+ "-U[1, 3000, 31.0, 6, 200, 2024-02-01T18:00, 50.0,
alpha, 100, 100, 200, 200, true, true, alpha|beta]",
+ // Third insert: final aggregated result
+ // sum: 3000+3000=6000, 31.0+30.5=61.5
+ // product: 6*5=30
+ // max: 200 (unchanged), 2024-02-01T18:00 (unchanged)
+ // min: 50.0 (unchanged), alpha (unchanged)
+ // first: 100, first_ignore_nulls: 200 (null ignored),
last: 300,
+ // last_ignore_nulls: 300
+ // bool_and: false, bool_or: true
+ // listagg: alpha|beta|gamma
+ "+U[1, 6000, 61.5, 30, 200, 2024-02-01T18:00, 50.0,
alpha, 100, 100, 300, 300, false, true, alpha|beta|gamma]");
+
+ assertResultsIgnoreOrder(rowIter, expectedRows, true);
+ }
+
private InsertAndExpectValues rowsToInsertInto(Collection<String>
partitions) {
List<String> insertValues = new ArrayList<>();
List<String> expectedValues = new ArrayList<>();
diff --git
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/utils/FlinkAggFunctionParserTest.java
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/utils/FlinkAggFunctionParserTest.java
new file mode 100644
index 000000000..62e861da7
--- /dev/null
+++
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/utils/FlinkAggFunctionParserTest.java
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.flink.utils;
+
+import org.apache.fluss.metadata.AggFunction;
+import org.apache.fluss.metadata.AggFunctionType;
+import org.apache.fluss.metadata.AggFunctions;
+
+import org.apache.flink.configuration.Configuration;
+import org.junit.jupiter.api.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/** Test for {@link FlinkAggFunctionParser}. */
+class FlinkAggFunctionParserTest {
+
+ @Test
+ void testParseNoAggFunction() {
+ Configuration options = new Configuration();
+ assertThat(FlinkAggFunctionParser.parseAggFunction("total",
options)).isEmpty();
+ }
+
+ @Test
+ void testParseFunctionWithoutParameters() {
+ Configuration options = new Configuration();
+ options.setString("fields.total.agg", "sum");
+
+ Optional<AggFunction> result =
FlinkAggFunctionParser.parseAggFunction("total", options);
+
+ assertThat(result).isPresent();
+ assertThat(result.get().getType()).isEqualTo(AggFunctionType.SUM);
+ assertThat(result.get().hasParameters()).isFalse();
+ }
+
+ @Test
+ void testParseFunctionWithParameters() {
+ Configuration options = new Configuration();
+ options.setString("fields.tags.agg", "listagg");
+ options.setString("fields.tags.listagg.delimiter", ";");
+
+ Optional<AggFunction> result =
FlinkAggFunctionParser.parseAggFunction("tags", options);
+
+ assertThat(result).isPresent();
+ assertThat(result.get().getType()).isEqualTo(AggFunctionType.LISTAGG);
+ assertThat(result.get().getParameter("delimiter")).contains(";");
+
+ Map<String, String> newOptions = new HashMap<>();
+ FlinkAggFunctionParser.formatAggFunctionToOptions(
+ "tags", AggFunctions.LISTAGG(";"), newOptions);
+ assertThat(newOptions).isEqualTo(options.toMap());
+ }
+
+ @Test
+ void testParseFunctionWithUpperCaseName() {
+ Configuration options = new Configuration();
+ options.setString("fields.tags.agg", "LISTAGG");
+ options.setString("fields.tags.listagg.delimiter", ";");
+
+ Optional<AggFunction> result =
FlinkAggFunctionParser.parseAggFunction("tags", options);
+
+ assertThat(result).isPresent();
+ assertThat(result.get().getType()).isEqualTo(AggFunctionType.LISTAGG);
+ assertThat(result.get().getParameter("delimiter")).contains(";");
+ }
+
+ @Test
+ void testParseColumnNameIsolation() {
+ // Test that configurations for different columns don't interfere with
each other
+ Configuration options = new Configuration();
+ options.setString("fields.col1.agg", "sum");
+ options.setString("fields.col2.agg", "listagg");
+ options.setString("fields.col2.listagg.delimiter", "|"); // This
should not affect col1
+
+ Optional<AggFunction> col1Func =
FlinkAggFunctionParser.parseAggFunction("col1", options);
+ Optional<AggFunction> col2Func =
FlinkAggFunctionParser.parseAggFunction("col2", options);
+ Optional<AggFunction> col3Func =
FlinkAggFunctionParser.parseAggFunction("col3", options);
+
+ // col1 should have SUM without parameters
+ assertThat(col1Func).isPresent();
+ assertThat(col1Func.get().getType()).isEqualTo(AggFunctionType.SUM);
+ assertThat(col1Func.get().hasParameters()).isFalse();
+
+ // col2 should have LISTAGG with delimiter parameter
+ assertThat(col2Func).isPresent();
+
assertThat(col2Func.get().getType()).isEqualTo(AggFunctionType.LISTAGG);
+ assertThat(col2Func.get().getParameter("delimiter")).contains("|");
+
+ // col3 should have no configuration
+ assertThat(col3Func).isEmpty();
+ }
+
+ @Test
+ void testParseInvalidFunctionName() {
+ Configuration options = new Configuration();
+ options.setString("fields.total.agg", "invalid_function");
+
+ assertThatThrownBy(() ->
FlinkAggFunctionParser.parseAggFunction("total", options))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessageContaining("Unknown aggregation function")
+ .hasMessageContaining("invalid_function");
+ }
+
+ @Test
+ void testParseEmptyFunctionName() {
+ Configuration options = new Configuration();
+ options.setString("fields.total.agg", "");
+
+ assertThatThrownBy(() ->
FlinkAggFunctionParser.parseAggFunction("total", options))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessageContaining("Empty aggregation function name");
+ }
+
+ @Test
+ void testRoundTripConversion() {
+ // Test that parse and format are inverse operations
+ Map<String, String> options = new HashMap<>();
+
+ // Format functions to options
+ FlinkAggFunctionParser.formatAggFunctionToOptions("col1",
AggFunctions.SUM(), options);
+ FlinkAggFunctionParser.formatAggFunctionToOptions(
+ "col2", AggFunctions.LISTAGG(";"), options);
+
+ // Parse them back
+ Configuration config = Configuration.fromMap(options);
+ Optional<AggFunction> col1Func =
FlinkAggFunctionParser.parseAggFunction("col1", config);
+ Optional<AggFunction> col2Func =
FlinkAggFunctionParser.parseAggFunction("col2", config);
+
+ // Verify they match the original functions
+ assertThat(col1Func).isPresent();
+ assertThat(col1Func.get()).isEqualTo(AggFunctions.SUM());
+
+ assertThat(col2Func).isPresent();
+ assertThat(col2Func.get()).isEqualTo(AggFunctions.LISTAGG(";"));
+ }
+}
diff --git
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/utils/FlinkConversionsTest.java
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/utils/FlinkConversionsTest.java
index b4546701e..a467bb14a 100644
---
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/utils/FlinkConversionsTest.java
+++
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/utils/FlinkConversionsTest.java
@@ -467,6 +467,55 @@ public class FlinkConversionsTest {
checkEqualsIgnoreSchema(convertedFlinkMaterializedTable,
expectedTable);
}
+ @Test
+ void testAggregationFunctionRoundTrip() {
+ // Test Flink → Fluss → Flink conversion preserves aggregation
functions.
+ ResolvedSchema schema =
+ new ResolvedSchema(
+ Arrays.asList(
+ Column.physical(
+ "id",
org.apache.flink.table.api.DataTypes.INT().notNull()),
+ Column.physical(
+ "sum_val",
org.apache.flink.table.api.DataTypes.INT()),
+ Column.physical(
+ "tags",
org.apache.flink.table.api.DataTypes.STRING())),
+ Collections.emptyList(),
+ UniqueConstraint.primaryKey("PK_id",
Collections.singletonList("id")));
+
+ Map<String, String> options = new HashMap<>();
+ options.put("table.merge-engine", "aggregation");
+ options.put("fields.sum_val.agg", "sum");
+ options.put("fields.tags.agg", "listagg");
+ options.put("fields.tags.listagg.delimiter", "|");
+
+ CatalogTable flinkTable =
+ CatalogTable.of(
+ Schema.newBuilder().fromResolvedSchema(schema).build(),
+ "round trip test",
+ Collections.emptyList(),
+ options);
+
+ // Flink → Fluss
+ TableDescriptor flussTable =
+ FlinkConversions.toFlussTable(new
ResolvedCatalogTable(flinkTable, schema));
+
+ // Fluss → Flink
+ TablePath tablePath = TablePath.of("db", "table");
+ long currentMillis = System.currentTimeMillis();
+ TableInfo tableInfo =
+ TableInfo.of(
+ tablePath,
+ 1L,
+ 1,
+ flussTable.withBucketCount(1),
+ currentMillis,
+ currentMillis);
+ CatalogTable convertedFlinkTable = (CatalogTable)
FlinkConversions.toFlinkTable(tableInfo);
+
+ // Verify aggregation functions are preserved
+
assertThat(convertedFlinkTable.getOptions()).containsAllEntriesOf(options);
+ }
+
/** Test refresh handler for testing purpose. */
public static class TestRefreshHandler implements RefreshHandler {
diff --git a/website/docs/table-design/merge-engines/aggregation.md
b/website/docs/table-design/merge-engines/aggregation.md
index c416c8a39..b328dc2b7 100644
--- a/website/docs/table-design/merge-engines/aggregation.md
+++ b/website/docs/table-design/merge-engines/aggregation.md
@@ -53,6 +53,13 @@ Specify the aggregate function for each non-primary key
field using connector op
'fields.<field-name>.agg' = '<function-name>'
```
+For functions that require parameters (e.g., `listagg` with custom delimiter):
+
+```sql
+'fields.<field-name>.agg' = '<function-name>',
+'fields.<field-name>.<function-name>.<param-name>' = '<param-value>'
+```
+
</TabItem>
<TabItem value="java-client" label="Java Client">
@@ -714,7 +721,7 @@ CREATE TABLE test_listagg (
'table.merge-engine' = 'aggregation',
'fields.tags1.agg' = 'listagg',
'fields.tags2.agg' = 'listagg',
- 'fields.tags2.delimiter' = ';' -- Specify delimiter inline
+ 'fields.tags2.listagg.delimiter' = ';' -- Specify delimiter as parameter
);
INSERT INTO test_listagg VALUES
@@ -776,7 +783,7 @@ CREATE TABLE test_string_agg (
'table.merge-engine' = 'aggregation',
'fields.tags1.agg' = 'string_agg',
'fields.tags2.agg' = 'string_agg',
- 'fields.tags2.delimiter' = ';' -- Specify delimiter inline
+ 'fields.tags2.string_agg.delimiter' = ';' -- Specify delimiter as
parameter
);
INSERT INTO test_string_agg VALUES