This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git


The following commit(s) were added to refs/heads/main by this push:
     new d10cb3590 [flink] Flink integration with Aggregation Merge Engine 
(#2307)
d10cb3590 is described below

commit d10cb3590aeeac00f1480ba879d09c4526b65a45
Author: Yang Wang <[email protected]>
AuthorDate: Tue Jan 20 16:38:09 2026 +0800

    [flink] Flink integration with Aggregation Merge Engine (#2307)
---
 .../fluss/flink/catalog/FlinkTableFactory.java     |   7 +-
 .../fluss/flink/utils/FlinkAggFunctionParser.java  | 207 +++++++++++++++++++++
 .../apache/fluss/flink/utils/FlinkConversions.java |  72 ++++++-
 .../fluss/flink/sink/FlinkTableSinkITCase.java     | 130 +++++++++++++
 .../flink/utils/FlinkAggFunctionParserTest.java    | 155 +++++++++++++++
 .../fluss/flink/utils/FlinkConversionsTest.java    |  49 +++++
 .../docs/table-design/merge-engines/aggregation.md |  11 +-
 7 files changed, 618 insertions(+), 13 deletions(-)

diff --git 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkTableFactory.java
 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkTableFactory.java
index e50522c45..48d74e712 100644
--- 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkTableFactory.java
+++ 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkTableFactory.java
@@ -93,7 +93,8 @@ public class FlinkTableFactory implements 
DynamicTableSourceFactory, DynamicTabl
         FactoryUtil.TableFactoryHelper helper = 
FactoryUtil.createTableFactoryHelper(this, context);
         final ReadableConfig tableOptions = helper.getOptions();
         Optional<DataLakeFormat> datalakeFormat = 
getDatalakeFormat(tableOptions);
-        List<String> prefixesToSkip = new ArrayList<>(Arrays.asList("table.", 
"client."));
+        List<String> prefixesToSkip =
+                new ArrayList<>(Arrays.asList("table.", "client.", "fields."));
         datalakeFormat.ifPresent(dataLakeFormat -> 
prefixesToSkip.add(dataLakeFormat + "."));
         helper.validateExcept(prefixesToSkip.toArray(new String[0]));
 
@@ -161,9 +162,9 @@ public class FlinkTableFactory implements 
DynamicTableSourceFactory, DynamicTabl
         final ReadableConfig tableOptions = helper.getOptions();
         Optional<DataLakeFormat> datalakeFormat = 
getDatalakeFormat(tableOptions);
         if (datalakeFormat.isPresent()) {
-            helper.validateExcept("table.", "client.", datalakeFormat.get() + 
".");
+            helper.validateExcept("table.", "client.", "fields.", 
datalakeFormat.get() + ".");
         } else {
-            helper.validateExcept("table.", "client.");
+            helper.validateExcept("table.", "client.", "fields.");
         }
 
         boolean isStreamingMode =
diff --git 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/FlinkAggFunctionParser.java
 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/FlinkAggFunctionParser.java
new file mode 100644
index 000000000..a781a75d5
--- /dev/null
+++ 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/FlinkAggFunctionParser.java
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.flink.utils;
+
+import org.apache.fluss.annotation.VisibleForTesting;
+import org.apache.fluss.metadata.AggFunction;
+import org.apache.fluss.metadata.AggFunctionType;
+import org.apache.fluss.metadata.AggFunctions;
+
+import org.apache.flink.configuration.Configuration;
+
+import javax.annotation.Nullable;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.stream.Collectors;
+
+/**
+ * Parser for aggregation function configuration from Flink table properties.
+ *
+ * <p>Configuration format (column-level only):
+ *
+ * <pre>
+ * fields.&lt;column&gt;.agg = &lt;function_name&gt;
+ * fields.&lt;column&gt;.&lt;function_name&gt;.&lt;param_name&gt; = 
&lt;param_value&gt;
+ * </pre>
+ *
+ * <p>Examples:
+ *
+ * <pre>
+ * 'fields.tags.agg' = 'listagg'
+ * 'fields.tags.listagg.delimiter' = ';'
+ * </pre>
+ */
+public class FlinkAggFunctionParser {
+
+    private static final String AGG_PREFIX = "fields.";
+    private static final String AGG_SUFFIX = ".agg";
+
+    /**
+     * Parse aggregation function for a column.
+     *
+     * <p>Returns empty if no aggregation function is configured for this 
column.
+     */
+    public static Optional<AggFunction> parseAggFunction(String columnName, 
Configuration options) {
+
+        // Check column-level configuration: fields.<column>.agg
+        String columnFuncKey = AGG_PREFIX + columnName + AGG_SUFFIX;
+        if (!options.containsKey(columnFuncKey)) {
+            return Optional.empty();
+        }
+
+        // convert to lower case for consistent parameter option key
+        String funcName = options.getString(columnFuncKey, null).toLowerCase();
+        AggFunctionType type = parseAggFunctionType(funcName, columnName);
+
+        // Collect column-level parameters: fields.<column>.<function_name>.*
+        Map<String, String> params =
+                collectParameters(AGG_PREFIX + columnName + "." + funcName + 
".", options);
+
+        return Optional.of(createAggFunction(type, params, columnName));
+    }
+
+    /**
+     * Format aggregation function to Flink table options.
+     *
+     * <p>Converts: AggFunction → Map&lt;String, String&gt;
+     *
+     * <p>Output format:
+     *
+     * <pre>
+     * fields.&lt;column&gt;.agg = &lt;function_name&gt;
+     * fields.&lt;column&gt;.&lt;function_name&gt;.&lt;param&gt; = 
&lt;value&gt;
+     * </pre>
+     */
+    public static void formatAggFunctionToOptions(
+            String columnName, AggFunction aggFunction, Map<String, String> 
options) {
+        // Set function name
+        String funcKey = AGG_PREFIX + columnName + AGG_SUFFIX;
+        // funcName has already in lower case
+        String funcName = aggFunction.getType().toString();
+        options.put(funcKey, funcName);
+
+        // Set parameters: fields.<column>.<function_name>.<param>
+        for (Map.Entry<String, String> param : 
aggFunction.getParameters().entrySet()) {
+            String paramKey = AGG_PREFIX + columnName + "." + funcName + "." + 
param.getKey();
+            options.put(paramKey, param.getValue());
+        }
+    }
+
+    /**
+     * Parse aggregation function type from string.
+     *
+     * @throws IllegalArgumentException if function name is invalid
+     */
+    @VisibleForTesting
+    static AggFunctionType parseAggFunctionType(@Nullable String funcName, 
String columnName) {
+        if (funcName == null || funcName.trim().isEmpty()) {
+            throw new IllegalArgumentException(
+                    String.format("Empty aggregation function name for column 
'%s'", columnName));
+        }
+
+        AggFunctionType type = AggFunctionType.fromString(funcName.trim());
+        if (type == null) {
+            throw new IllegalArgumentException(
+                    String.format(
+                            "Unknown aggregation function for column '%s': 
%s\n"
+                                    + "Supported functions: %s",
+                            columnName, funcName, 
getSupportedFunctionNames()));
+        }
+
+        return type;
+    }
+
+    /**
+     * Get a comma-separated list of all supported aggregation function names.
+     *
+     * <p>This method dynamically generates the list from the AggFunctionType 
enum, ensuring that
+     * the error message automatically includes any newly added functions 
without requiring manual
+     * updates.
+     *
+     * @return comma-separated list of supported function names
+     */
+    private static String getSupportedFunctionNames() {
+        return Arrays.stream(AggFunctionType.values())
+                .map(AggFunctionType::toString)
+                .sorted()
+                .collect(Collectors.joining(", "));
+    }
+
+    /**
+     * Collect parameters with given prefix.
+     *
+     * <p>For prefix "fields.tags.listagg.", collects:
+     *
+     * <ul>
+     *   <li>fields.tags.listagg.delimiter → delimiter
+     *   <li>fields.tags.listagg.xxx → xxx
+     * </ul>
+     */
+    @VisibleForTesting
+    static Map<String, String> collectParameters(String prefix, Configuration 
options) {
+        Map<String, String> params = new HashMap<>();
+
+        for (String key : options.keySet()) {
+            if (key.startsWith(prefix)) {
+                String paramName = key.substring(prefix.length());
+                String paramValue = options.getString(key, null);
+                if (paramValue != null) {
+                    params.put(paramName, paramValue);
+                }
+            }
+        }
+
+        return params;
+    }
+
+    /**
+     * Create AggFunction from type and parameters.
+     *
+     * <p>This method uses the generic factory method from {@link 
AggFunctions} to create
+     * aggregation functions, making it independent of specific function 
implementations. When new
+     * aggregation functions are added to the system, no changes are required 
here.
+     *
+     * @param type the aggregation function type
+     * @param params the function parameters (may be empty)
+     * @param columnName the column name (used for error messages)
+     * @return the created aggregation function
+     */
+    private static AggFunction createAggFunction(
+            AggFunctionType type, Map<String, String> params, String 
columnName) {
+        // Use generic factory method to create aggregation function
+        // This delegates parameter handling and validation to the underlying 
implementation
+        AggFunction aggFunction =
+                params.isEmpty() ? AggFunctions.of(type) : 
AggFunctions.of(type, params);
+
+        // Validate all parameters to ensure they are supported and valid
+        try {
+            aggFunction.validate();
+        } catch (IllegalArgumentException e) {
+            throw new IllegalArgumentException(
+                    String.format(
+                            "Invalid aggregation function configuration for 
column '%s': %s",
+                            columnName, e.getMessage()),
+                    e);
+        }
+
+        return aggFunction;
+    }
+}
diff --git 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/FlinkConversions.java
 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/FlinkConversions.java
index 5fc721c40..f2d34dc14 100644
--- 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/FlinkConversions.java
+++ 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/FlinkConversions.java
@@ -23,7 +23,9 @@ import org.apache.fluss.config.MemorySize;
 import org.apache.fluss.config.Password;
 import org.apache.fluss.flink.adapter.CatalogTableAdapter;
 import org.apache.fluss.flink.catalog.FlinkCatalogFactory;
+import org.apache.fluss.metadata.AggFunction;
 import org.apache.fluss.metadata.DatabaseDescriptor;
+import org.apache.fluss.metadata.MergeEngineType;
 import org.apache.fluss.metadata.Schema;
 import org.apache.fluss.metadata.TableChange;
 import org.apache.fluss.metadata.TableDescriptor;
@@ -60,6 +62,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
+import java.util.Optional;
 import java.util.stream.Collectors;
 
 import static org.apache.flink.table.factories.FactoryUtil.CONNECTOR;
@@ -67,6 +70,7 @@ import static 
org.apache.flink.table.utils.EncodingUtils.decodeBase64ToBytes;
 import static org.apache.flink.table.utils.EncodingUtils.encodeBytesToBase64;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 import static org.apache.flink.util.Preconditions.checkState;
+import static org.apache.fluss.config.ConfigOptions.TABLE_MERGE_ENGINE;
 import static org.apache.fluss.config.FlussConfigUtils.isTableStorageConfig;
 import static 
org.apache.fluss.flink.FlinkConnectorOptions.AUTO_INCREMENT_FIELDS;
 import static org.apache.fluss.flink.FlinkConnectorOptions.BUCKET_KEY;
@@ -127,6 +131,14 @@ public class FlinkConversions {
         }
 
         Schema schema = tableInfo.getSchema();
+
+        // convert aggregation functions to flink options
+        for (Schema.Column column : schema.getColumns()) {
+            if (column.getAggFunction().isPresent()) {
+                FlinkAggFunctionParser.formatAggFunctionToOptions(
+                        column.getName(), column.getAggFunction().get(), 
newOptions);
+            }
+        }
         List<String> physicalColumns = schema.getColumnNames();
         int columnCount =
                 physicalColumns.size()
@@ -197,17 +209,16 @@ public class FlinkConversions {
             
schemBuilder.primaryKey(resolvedSchema.getPrimaryKey().get().getColumns());
         }
 
-        // first build schema with physical columns
+        // Check if aggregation merge engine is enabled to optimize parsing
+        boolean isAggregationEngine = isAggregationMergeEngine(flinkTableConf);
+
+        // Build schema with physical columns
         resolvedSchema.getColumns().stream()
                 .filter(Column::isPhysical)
                 .forEachOrdered(
-                        column -> {
-                            schemBuilder
-                                    .column(
-                                            column.getName(),
-                                            
FlinkConversions.toFlussType(column.getDataType()))
-                                    
.withComment(column.getComment().orElse(null));
-                        });
+                        column ->
+                                addColumnToSchema(
+                                        schemBuilder, column, flinkTableConf, 
isAggregationEngine));
 
         // Configure auto-increment columns based on the 
'auto-increment.fields' option.
         if (flinkTableConf.containsKey(AUTO_INCREMENT_FIELDS.key())) {
@@ -649,6 +660,51 @@ public class FlinkConversions {
         return builder.build();
     }
 
+    /**
+     * Check if the table uses aggregation merge engine.
+     *
+     * @param tableConf the table configuration
+     * @return true if aggregation merge engine is enabled, false otherwise
+     */
+    private static boolean isAggregationMergeEngine(Configuration tableConf) {
+        String mergeEngineStr = tableConf.getString(TABLE_MERGE_ENGINE.key(), 
null);
+        return mergeEngineStr != null
+                && MergeEngineType.fromString(mergeEngineStr) == 
MergeEngineType.AGGREGATION;
+    }
+
+    /**
+     * Add a column to the schema builder with optional aggregation function.
+     *
+     * @param schemaBuilder the schema builder
+     * @param column the Flink column
+     * @param tableConf the table configuration
+     * @param parseAggFunction whether to parse aggregation function from 
config
+     */
+    private static void addColumnToSchema(
+            Schema.Builder schemaBuilder,
+            Column column,
+            Configuration tableConf,
+            boolean parseAggFunction) {
+        String columnName = column.getName();
+        DataType flussDataType = toFlussType(column.getDataType());
+
+        // Parse and add aggregation function if needed
+        if (parseAggFunction) {
+            Optional<AggFunction> aggFunction =
+                    FlinkAggFunctionParser.parseAggFunction(columnName, 
tableConf);
+            if (aggFunction.isPresent()) {
+                schemaBuilder.column(columnName, flussDataType, 
aggFunction.get());
+            } else {
+                schemaBuilder.column(columnName, flussDataType);
+            }
+        } else {
+            schemaBuilder.column(columnName, flussDataType);
+        }
+
+        // Add comment if present
+        column.getComment().ifPresent(schemaBuilder::withComment);
+    }
+
     private static Map<String, String> extractCustomProperties(
             Configuration allProperties, Map<String, String> 
flussTableProperties) {
         Map<String, String> customProperties = new 
HashMap<>(allProperties.toMap());
diff --git 
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/sink/FlinkTableSinkITCase.java
 
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/sink/FlinkTableSinkITCase.java
index 1af582e4d..f342379b1 100644
--- 
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/sink/FlinkTableSinkITCase.java
+++ 
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/sink/FlinkTableSinkITCase.java
@@ -1307,6 +1307,136 @@ abstract class FlinkTableSinkITCase extends 
AbstractTestBase {
         assertResultsIgnoreOrder(rowIter, expectedRows, true);
     }
 
+    @Test
+    void testComprehensiveAggregationFunctions() throws Exception {
+        // Test all 11 aggregate functions (each function tested once with 
representative data type)
+        tEnv.executeSql(
+                "create table comprehensive_agg ("
+                        + "id int not null primary key not enforced, "
+                        // Numeric aggregations
+                        + "sum_int int, "
+                        + "sum_double double, "
+                        + "product_int int, "
+                        // Max/Min aggregations (representative types: int, 
double, string,
+                        // timestamp)
+                        + "max_int int, "
+                        + "max_timestamp timestamp(3), "
+                        + "min_double double, "
+                        + "min_string string, "
+                        // Value selection aggregations (test with/without 
nulls)
+                        + "first_val int, "
+                        + "first_val_non_null int, "
+                        + "last_val int, "
+                        + "last_val_non_null int, "
+                        // Boolean aggregations
+                        + "bool_and_val boolean, "
+                        + "bool_or_val boolean, "
+                        // String aggregation with custom delimiter
+                        + "listagg_val string"
+                        + ") with ("
+                        + "'table.merge-engine' = 'aggregation', "
+                        + "'fields.sum_int.agg' = 'sum', "
+                        + "'fields.sum_double.agg' = 'sum', "
+                        + "'fields.product_int.agg' = 'product', "
+                        + "'fields.max_int.agg' = 'max', "
+                        + "'fields.max_timestamp.agg' = 'max', "
+                        + "'fields.min_double.agg' = 'min', "
+                        + "'fields.min_string.agg' = 'min', "
+                        + "'fields.first_val.agg' = 'first_value', "
+                        + "'fields.first_val_non_null.agg' = 
'first_value_ignore_nulls', "
+                        + "'fields.last_val.agg' = 'last_value', "
+                        + "'fields.last_val_non_null.agg' = 
'last_value_ignore_nulls', "
+                        + "'fields.bool_and_val.agg' = 'bool_and', "
+                        + "'fields.bool_or_val.agg' = 'bool_or', "
+                        + "'fields.listagg_val.agg' = 'listagg', "
+                        + "'fields.listagg_val.listagg.delimiter' = '|')");
+
+        // Insert first batch - initial values
+        tEnv.executeSql(
+                        "INSERT INTO comprehensive_agg VALUES ("
+                                + "1, " // id
+                                + "1000, 10.5, " // sum_int, sum_double
+                                + "2, " // product_int
+                                + "100, TIMESTAMP '2024-01-15 15:00:00', " // 
max_int, max_timestamp
+                                + "100.0, 'beta', " // min_double, min_string
+                                + "100, 100, 100, 100, " // first_value, 
first_value_ignore_nulls,
+                                // last_value, last_value_ignore_nulls
+                                + "true, false, " // bool_and_val, bool_or_val
+                                + "'alpha'" // listagg_val
+                                + ")")
+                .await();
+
+        // Insert second batch - trigger aggregation
+        tEnv.executeSql(
+                        "INSERT INTO comprehensive_agg VALUES ("
+                                + "1, " // id
+                                + "2000, 20.5, " // sum: 1000+2000=3000, 
10.5+20.5=31.0
+                                + "3, " // product: 2*3=6
+                                + "200, TIMESTAMP '2024-02-01 18:00:00', " // 
max: 200, 2024-02-01
+                                // 18:00
+                                + "50.0, 'alpha', " // min: 50.0, alpha
+                                + "200, 200, 200, 200, " // first: keep 100, 
first_ignore_nulls:
+                                // 200, last: 200, last_ignore_nulls: 200
+                                + "true, true, " // bool_and: true AND 
true=true, bool_or: false OR
+                                // true=true
+                                + "'beta'" // listagg: alpha|beta
+                                + ")")
+                .await();
+
+        // Insert third batch - further aggregation with null handling test
+        tEnv.executeSql(
+                        "INSERT INTO comprehensive_agg VALUES ("
+                                + "1, " // id
+                                + "3000, 30.5, " // sum: 3000+3000=6000, 
31.0+30.5=61.5
+                                + "5, " // product: 6*5=30
+                                + "150, TIMESTAMP '2024-01-20 14:00:00', " // 
max: keep 200, keep
+                                // 2024-02-01 18:00
+                                + "80.0, 'charlie', " // min: keep 50.0, keep 
alpha
+                                + "300, CAST(NULL AS INT), 300, 300, " // 
first: keep 100, ignore
+                                // null keep 200, last: 300,
+                                // last_ignore_nulls: 300
+                                + "false, true, " // bool_and: true AND 
false=false, bool_or: true
+                                // OR true=true
+                                + "'gamma'" // listagg: alpha|beta|gamma
+                                + ")")
+                .await();
+
+        // Query and verify aggregated results
+        CloseableIterator<Row> rowIter =
+                tEnv.executeSql("SELECT * FROM comprehensive_agg").collect();
+
+        // Expected results: changelog with 5 records (+I, -U, +U, -U, +U)
+        List<String> expectedRows =
+                Arrays.asList(
+                        // First insert: initial values
+                        "+I[1, 1000, 10.5, 2, 100, 2024-01-15T15:00, 100.0, 
beta, 100, 100, 100, 100, true, false, alpha]",
+                        // Second insert: retraction
+                        "-U[1, 1000, 10.5, 2, 100, 2024-01-15T15:00, 100.0, 
beta, 100, 100, 100, 100, true, false, alpha]",
+                        // Second insert: aggregated result
+                        // sum: 1000+2000=3000, 10.5+20.5=31.0
+                        // product: 2*3=6
+                        // max: 200, 2024-02-01T18:00
+                        // min: 50.0, alpha
+                        // first: 100, first_non_null: 200, last: 200, 
last_non_null: 200
+                        // bool_and: true, bool_or: true
+                        // listagg: alpha|beta
+                        "+U[1, 3000, 31.0, 6, 200, 2024-02-01T18:00, 50.0, 
alpha, 100, 100, 200, 200, true, true, alpha|beta]",
+                        // Third insert: retraction
+                        "-U[1, 3000, 31.0, 6, 200, 2024-02-01T18:00, 50.0, 
alpha, 100, 100, 200, 200, true, true, alpha|beta]",
+                        // Third insert: final aggregated result
+                        // sum: 3000+3000=6000, 31.0+30.5=61.5
+                        // product: 6*5=30
+                        // max: 200 (unchanged), 2024-02-01T18:00 (unchanged)
+                        // min: 50.0 (unchanged), alpha (unchanged)
+                        // first: 100, first_ignore_nulls: 200 (null ignored), 
last: 300,
+                        // last_ignore_nulls: 300
+                        // bool_and: false, bool_or: true
+                        // listagg: alpha|beta|gamma
+                        "+U[1, 6000, 61.5, 30, 200, 2024-02-01T18:00, 50.0, 
alpha, 100, 100, 300, 300, false, true, alpha|beta|gamma]");
+
+        assertResultsIgnoreOrder(rowIter, expectedRows, true);
+    }
+
     private InsertAndExpectValues rowsToInsertInto(Collection<String> 
partitions) {
         List<String> insertValues = new ArrayList<>();
         List<String> expectedValues = new ArrayList<>();
diff --git 
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/utils/FlinkAggFunctionParserTest.java
 
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/utils/FlinkAggFunctionParserTest.java
new file mode 100644
index 000000000..62e861da7
--- /dev/null
+++ 
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/utils/FlinkAggFunctionParserTest.java
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.flink.utils;
+
+import org.apache.fluss.metadata.AggFunction;
+import org.apache.fluss.metadata.AggFunctionType;
+import org.apache.fluss.metadata.AggFunctions;
+
+import org.apache.flink.configuration.Configuration;
+import org.junit.jupiter.api.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/** Test for {@link FlinkAggFunctionParser}. */
+class FlinkAggFunctionParserTest {
+
+    @Test
+    void testParseNoAggFunction() {
+        Configuration options = new Configuration();
+        assertThat(FlinkAggFunctionParser.parseAggFunction("total", 
options)).isEmpty();
+    }
+
+    @Test
+    void testParseFunctionWithoutParameters() {
+        Configuration options = new Configuration();
+        options.setString("fields.total.agg", "sum");
+
+        Optional<AggFunction> result = 
FlinkAggFunctionParser.parseAggFunction("total", options);
+
+        assertThat(result).isPresent();
+        assertThat(result.get().getType()).isEqualTo(AggFunctionType.SUM);
+        assertThat(result.get().hasParameters()).isFalse();
+    }
+
+    @Test
+    void testParseFunctionWithParameters() {
+        Configuration options = new Configuration();
+        options.setString("fields.tags.agg", "listagg");
+        options.setString("fields.tags.listagg.delimiter", ";");
+
+        Optional<AggFunction> result = 
FlinkAggFunctionParser.parseAggFunction("tags", options);
+
+        assertThat(result).isPresent();
+        assertThat(result.get().getType()).isEqualTo(AggFunctionType.LISTAGG);
+        assertThat(result.get().getParameter("delimiter")).contains(";");
+
+        Map<String, String> newOptions = new HashMap<>();
+        FlinkAggFunctionParser.formatAggFunctionToOptions(
+                "tags", AggFunctions.LISTAGG(";"), newOptions);
+        assertThat(newOptions).isEqualTo(options.toMap());
+    }
+
+    @Test
+    void testParseFunctionWithUpperCaseName() {
+        Configuration options = new Configuration();
+        options.setString("fields.tags.agg", "LISTAGG");
+        options.setString("fields.tags.listagg.delimiter", ";");
+
+        Optional<AggFunction> result = 
FlinkAggFunctionParser.parseAggFunction("tags", options);
+
+        assertThat(result).isPresent();
+        assertThat(result.get().getType()).isEqualTo(AggFunctionType.LISTAGG);
+        assertThat(result.get().getParameter("delimiter")).contains(";");
+    }
+
+    @Test
+    void testParseColumnNameIsolation() {
+        // Test that configurations for different columns don't interfere with 
each other
+        Configuration options = new Configuration();
+        options.setString("fields.col1.agg", "sum");
+        options.setString("fields.col2.agg", "listagg");
+        options.setString("fields.col2.listagg.delimiter", "|"); // This 
should not affect col1
+
+        Optional<AggFunction> col1Func = 
FlinkAggFunctionParser.parseAggFunction("col1", options);
+        Optional<AggFunction> col2Func = 
FlinkAggFunctionParser.parseAggFunction("col2", options);
+        Optional<AggFunction> col3Func = 
FlinkAggFunctionParser.parseAggFunction("col3", options);
+
+        // col1 should have SUM without parameters
+        assertThat(col1Func).isPresent();
+        assertThat(col1Func.get().getType()).isEqualTo(AggFunctionType.SUM);
+        assertThat(col1Func.get().hasParameters()).isFalse();
+
+        // col2 should have LISTAGG with delimiter parameter
+        assertThat(col2Func).isPresent();
+        
assertThat(col2Func.get().getType()).isEqualTo(AggFunctionType.LISTAGG);
+        assertThat(col2Func.get().getParameter("delimiter")).contains("|");
+
+        // col3 should have no configuration
+        assertThat(col3Func).isEmpty();
+    }
+
+    @Test
+    void testParseInvalidFunctionName() {
+        Configuration options = new Configuration();
+        options.setString("fields.total.agg", "invalid_function");
+
+        assertThatThrownBy(() -> 
FlinkAggFunctionParser.parseAggFunction("total", options))
+                .isInstanceOf(IllegalArgumentException.class)
+                .hasMessageContaining("Unknown aggregation function")
+                .hasMessageContaining("invalid_function");
+    }
+
+    @Test
+    void testParseEmptyFunctionName() {
+        Configuration options = new Configuration();
+        options.setString("fields.total.agg", "");
+
+        assertThatThrownBy(() -> 
FlinkAggFunctionParser.parseAggFunction("total", options))
+                .isInstanceOf(IllegalArgumentException.class)
+                .hasMessageContaining("Empty aggregation function name");
+    }
+
+    @Test
+    void testRoundTripConversion() {
+        // Test that parse and format are inverse operations
+        Map<String, String> options = new HashMap<>();
+
+        // Format functions to options
+        FlinkAggFunctionParser.formatAggFunctionToOptions("col1", 
AggFunctions.SUM(), options);
+        FlinkAggFunctionParser.formatAggFunctionToOptions(
+                "col2", AggFunctions.LISTAGG(";"), options);
+
+        // Parse them back
+        Configuration config = Configuration.fromMap(options);
+        Optional<AggFunction> col1Func = 
FlinkAggFunctionParser.parseAggFunction("col1", config);
+        Optional<AggFunction> col2Func = 
FlinkAggFunctionParser.parseAggFunction("col2", config);
+
+        // Verify they match the original functions
+        assertThat(col1Func).isPresent();
+        assertThat(col1Func.get()).isEqualTo(AggFunctions.SUM());
+
+        assertThat(col2Func).isPresent();
+        assertThat(col2Func.get()).isEqualTo(AggFunctions.LISTAGG(";"));
+    }
+}
diff --git 
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/utils/FlinkConversionsTest.java
 
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/utils/FlinkConversionsTest.java
index b4546701e..a467bb14a 100644
--- 
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/utils/FlinkConversionsTest.java
+++ 
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/utils/FlinkConversionsTest.java
@@ -467,6 +467,55 @@ public class FlinkConversionsTest {
         checkEqualsIgnoreSchema(convertedFlinkMaterializedTable, 
expectedTable);
     }
 
+    @Test
+    void testAggregationFunctionRoundTrip() {
+        // Test Flink → Fluss → Flink conversion preserves aggregation 
functions.
+        ResolvedSchema schema =
+                new ResolvedSchema(
+                        Arrays.asList(
+                                Column.physical(
+                                        "id", 
org.apache.flink.table.api.DataTypes.INT().notNull()),
+                                Column.physical(
+                                        "sum_val", 
org.apache.flink.table.api.DataTypes.INT()),
+                                Column.physical(
+                                        "tags", 
org.apache.flink.table.api.DataTypes.STRING())),
+                        Collections.emptyList(),
+                        UniqueConstraint.primaryKey("PK_id", 
Collections.singletonList("id")));
+
+        Map<String, String> options = new HashMap<>();
+        options.put("table.merge-engine", "aggregation");
+        options.put("fields.sum_val.agg", "sum");
+        options.put("fields.tags.agg", "listagg");
+        options.put("fields.tags.listagg.delimiter", "|");
+
+        CatalogTable flinkTable =
+                CatalogTable.of(
+                        Schema.newBuilder().fromResolvedSchema(schema).build(),
+                        "round trip test",
+                        Collections.emptyList(),
+                        options);
+
+        // Flink → Fluss
+        TableDescriptor flussTable =
+                FlinkConversions.toFlussTable(new 
ResolvedCatalogTable(flinkTable, schema));
+
+        // Fluss → Flink
+        TablePath tablePath = TablePath.of("db", "table");
+        long currentMillis = System.currentTimeMillis();
+        TableInfo tableInfo =
+                TableInfo.of(
+                        tablePath,
+                        1L,
+                        1,
+                        flussTable.withBucketCount(1),
+                        currentMillis,
+                        currentMillis);
+        CatalogTable convertedFlinkTable = (CatalogTable) 
FlinkConversions.toFlinkTable(tableInfo);
+
+        // Verify aggregation functions are preserved
+        
assertThat(convertedFlinkTable.getOptions()).containsAllEntriesOf(options);
+    }
+
     /** Test refresh handler for testing purpose. */
     public static class TestRefreshHandler implements RefreshHandler {
 
diff --git a/website/docs/table-design/merge-engines/aggregation.md 
b/website/docs/table-design/merge-engines/aggregation.md
index c416c8a39..b328dc2b7 100644
--- a/website/docs/table-design/merge-engines/aggregation.md
+++ b/website/docs/table-design/merge-engines/aggregation.md
@@ -53,6 +53,13 @@ Specify the aggregate function for each non-primary key 
field using connector op
 'fields.<field-name>.agg' = '<function-name>'
 ```
 
+For functions that require parameters (e.g., `listagg` with custom delimiter):
+
+```sql
+'fields.<field-name>.agg' = '<function-name>',
+'fields.<field-name>.<function-name>.<param-name>' = '<param-value>'
+```
+
 </TabItem>
 <TabItem value="java-client" label="Java Client">
 
@@ -714,7 +721,7 @@ CREATE TABLE test_listagg  (
     'table.merge-engine' = 'aggregation',
     'fields.tags1.agg' = 'listagg',
     'fields.tags2.agg' = 'listagg',
-    'fields.tags2.delimiter' = ';'   -- Specify delimiter inline
+    'fields.tags2.listagg.delimiter' = ';'   -- Specify delimiter as parameter
 );
 
 INSERT INTO test_listagg VALUES
@@ -776,7 +783,7 @@ CREATE TABLE test_string_agg  (
     'table.merge-engine' = 'aggregation',
     'fields.tags1.agg' = 'string_agg',
     'fields.tags2.agg' = 'string_agg',
-    'fields.tags2.delimiter' = ';'   -- Specify delimiter inline
+    'fields.tags2.string_agg.delimiter' = ';'   -- Specify delimiter as 
parameter
 );
 
 INSERT INTO test_string_agg VALUES


Reply via email to