wuchong commented on code in PR #2255:
URL: https://github.com/apache/fluss/pull/2255#discussion_r2649712131


##########
website/docs/table-design/table-types/pk-table/merge-engines/aggregation.md:
##########
@@ -0,0 +1,534 @@
+---
+sidebar_label: Aggregation
+title: Aggregation Merge Engine
+sidebar_position: 4
+---
+
+# Aggregation Merge Engine
+
+## Overview
+
+The **Aggregation Merge Engine** is designed for scenarios where users only 
care about aggregated results rather than individual records. It aggregates 
each value field with the latest data one by one under the same primary key 
according to the specified aggregate function.
+
+Each field not part of the primary keys can be assigned an aggregate function 
using the Schema API (recommended) or connector options 
(`'fields.<field-name>.agg'`). If no function is specified for a field, it will 
use `last_value_ignore_nulls` aggregation as the default behavior.
+
+This merge engine is useful for real-time aggregation scenarios such as:
+- Computing running totals and statistics
+- Maintaining counters and metrics
+- Tracking maximum/minimum values over time
+- Building real-time dashboards and analytics
+
+## Configuration
+
+To enable the aggregation merge engine, set the following table property:
+
+```
+'table.merge-engine' = 'aggregation'
+```
+
+Then specify the aggregate function for each non-primary key field using 
connector options:
+
+```
+'fields.<field-name>.agg' = '<function-name>'
+```
+
+**Note**: The recommended way is to use Schema API (see section "API Usage" 
below). The connector option is provided as an alternative for 
connector-specific scenarios.

Review Comment:
   I think it's hard to tell which way is recommended, as it depends on the 
client users are working with:  
   - For **Java clients**, the `Schema` API is natural.  
   - For **compute engines** like Flink or Spark, DDL and connector options are 
more typical.
   
   Given that most Fluss users interact with it through engines like **Flink** 
or **Spark**, I suggest **prioritizing Flink SQL examples** in the 
documentation.
   
   We can use **Tabs** to show both Flink SQL and Java Client examples side by 
side, with **Flink SQL as the default tab**.  
   You can refer to `datastream.mdx` for an example of how to implement tabs in 
our Markdown documentation.
   
   Besides, we can use `INSERT INTO ... VALUES ...` and `SELECT * FROM ..` for 
Flink SQL examples to show the input and result data which is easier to demo 
for users. 



##########
website/docs/table-design/table-types/pk-table/merge-engines/aggregation.md:
##########
@@ -0,0 +1,534 @@
+---
+sidebar_label: Aggregation
+title: Aggregation Merge Engine
+sidebar_position: 4
+---
+
+# Aggregation Merge Engine
+
+## Overview
+
+The **Aggregation Merge Engine** is designed for scenarios where users only 
care about aggregated results rather than individual records. It aggregates 
each value field with the latest data one by one under the same primary key 
according to the specified aggregate function.
+
+Each field not part of the primary keys can be assigned an aggregate function 
using the Schema API (recommended) or connector options 
(`'fields.<field-name>.agg'`). If no function is specified for a field, it will 
use `last_value_ignore_nulls` aggregation as the default behavior.
+
+This merge engine is useful for real-time aggregation scenarios such as:
+- Computing running totals and statistics
+- Maintaining counters and metrics
+- Tracking maximum/minimum values over time
+- Building real-time dashboards and analytics
+
+## Configuration
+
+To enable the aggregation merge engine, set the following table property:
+
+```
+'table.merge-engine' = 'aggregation'
+```
+
+Then specify the aggregate function for each non-primary key field using 
connector options:
+
+```
+'fields.<field-name>.agg' = '<function-name>'
+```
+
+**Note**: The recommended way is to use Schema API (see section "API Usage" 
below). The connector option is provided as an alternative for 
connector-specific scenarios.
+
+## API Usage
+
+### Creating a Table with Aggregation
+
+```java
+import org.apache.fluss.client.Connection;
+import org.apache.fluss.client.admin.Admin;
+import org.apache.fluss.client.table.Table;
+import org.apache.fluss.metadata.Schema;
+import org.apache.fluss.metadata.TableDescriptor;
+import org.apache.fluss.metadata.TablePath;
+import org.apache.fluss.types.DataTypes;
+
+// Create connection
+Connection conn = Connection.create(config);
+Admin admin = conn.getAdmin();
+
+// Define schema with aggregation functions (recommended way)
+import org.apache.fluss.metadata.AggFunction;
+
+Schema schema = Schema.newBuilder()
+    .column("product_id", DataTypes.BIGINT())
+    .column("price", DataTypes.DOUBLE(), AggFunction.MAX)
+    .column("sales", DataTypes.BIGINT(), AggFunction.SUM)
+    .column("last_update_time", DataTypes.TIMESTAMP(3))  // Defaults to 
LAST_VALUE_IGNORE_NULLS
+    .primaryKey("product_id")
+    .build();
+
+// Create table with aggregation merge engine
+TableDescriptor tableDescriptor = TableDescriptor.builder()
+    .schema(schema)
+    .property("table.merge-engine", "aggregation")
+    .build();
+
+TablePath tablePath = TablePath.of("my_database", "product_stats");
+admin.createTable(tablePath, tableDescriptor, false).get();
+```
+
+### Writing Data
+
+```java
+// Get table
+Table table = conn.getTable(tablePath);
+
+// Create upsert writer
+UpsertWriter writer = table.newUpsert().createWriter();
+
+// Write data - these will be aggregated
+writer.upsert(row(1L, 23.0, 15L, timestamp1));
+writer.upsert(row(1L, 30.2, 20L, timestamp2)); // Same primary key - triggers 
aggregation
+
+writer.flush();
+```
+
+**Result after aggregation:**
+- `product_id`: 1
+- `price`: 30.2 (max of 23.0 and 30.2)
+- `sales`: 35 (sum of 15 and 20)
+- `last_update_time`: timestamp2 (last non-null value)
+
+## Supported Aggregate Functions
+
+Fluss currently supports the following aggregate functions:
+
+### sum
+
+Aggregates values by computing the sum across multiple rows.
+
+- **Supported Data Types**: TINYINT, SMALLINT, INT, BIGINT, FLOAT, DOUBLE, 
DECIMAL
+- **Behavior**: Adds incoming values to the accumulator
+- **Null Handling**: Null values are ignored
+
+**Example:**
+```java
+Schema schema = Schema.newBuilder()
+    .column("id", DataTypes.BIGINT())
+    .column("amount", DataTypes.DECIMAL(10, 2), AggFunction.SUM)
+    .primaryKey("id")
+    .build();
+
+TableDescriptor.builder()
+    .schema(schema)
+    .property("table.merge-engine", "aggregation")
+    .build();
+
+// Input: (1, 100.50), (1, 200.75)
+// Result: (1, 301.25)
+```
+
+### product
+
+Computes the product of values across multiple rows.
+
+- **Supported Data Types**: TINYINT, SMALLINT, INT, BIGINT, FLOAT, DOUBLE, 
DECIMAL
+- **Behavior**: Multiplies incoming values with the accumulator
+- **Null Handling**: Null values are ignored
+
+**Example:**
+```java
+Schema schema = Schema.newBuilder()
+    .column("id", DataTypes.BIGINT())
+    .column("discount_factor", DataTypes.DOUBLE(), AggFunction.PRODUCT)
+    .primaryKey("id")
+    .build();
+
+TableDescriptor.builder()
+    .schema(schema)
+    .property("table.merge-engine", "aggregation")
+    .build();
+
+// Input: (1, 0.9), (1, 0.8)
+// Result: (1, 0.72) -- 90% * 80% = 72%
+```
+
+### max
+
+Identifies and retains the maximum value.
+
+- **Supported Data Types**: CHAR, STRING, TINYINT, SMALLINT, INT, BIGINT, 
FLOAT, DOUBLE, DECIMAL, DATE, TIME, TIMESTAMP, TIMESTAMP_LTZ
+- **Behavior**: Keeps the larger value between accumulator and incoming value
+- **Null Handling**: Null values are ignored
+
+**Example:**
+```java
+Schema schema = Schema.newBuilder()
+    .column("id", DataTypes.BIGINT())
+    .column("max_temperature", DataTypes.DOUBLE(), AggFunction.MAX)
+    .column("max_reading_time", DataTypes.TIMESTAMP(3), AggFunction.MAX)
+    .primaryKey("id")
+    .build();
+
+TableDescriptor.builder()
+    .schema(schema)
+    .property("table.merge-engine", "aggregation")
+    .build();
+
+// Input: (1, 25.5, '2024-01-01 10:00:00'), (1, 28.3, '2024-01-01 11:00:00')
+// Result: (1, 28.3, '2024-01-01 11:00:00')
+```
+
+### min
+
+Identifies and retains the minimum value.
+
+- **Supported Data Types**: CHAR, STRING, TINYINT, SMALLINT, INT, BIGINT, 
FLOAT, DOUBLE, DECIMAL, DATE, TIME, TIMESTAMP, TIMESTAMP_LTZ
+- **Behavior**: Keeps the smaller value between accumulator and incoming value
+- **Null Handling**: Null values are ignored
+
+**Example:**
+```java
+Schema schema = Schema.newBuilder()
+    .column("id", DataTypes.BIGINT())
+    .column("lowest_price", DataTypes.DECIMAL(10, 2), AggFunction.MIN)
+    .primaryKey("id")
+    .build();
+
+TableDescriptor.builder()
+    .schema(schema)
+    .property("table.merge-engine", "aggregation")
+    .build();
+
+// Input: (1, 99.99), (1, 79.99), (1, 89.99)
+// Result: (1, 79.99)
+```
+
+### last_value
+
+Replaces the previous value with the most recently received value.
+
+- **Supported Data Types**: All data types
+- **Behavior**: Always uses the latest incoming value
+- **Null Handling**: Null values will overwrite previous values
+
+**Example:**
+```java
+Schema schema = Schema.newBuilder()
+    .column("id", DataTypes.BIGINT())
+    .column("status", DataTypes.STRING(), AggFunction.LAST_VALUE)
+    .column("last_login", DataTypes.TIMESTAMP(3), AggFunction.LAST_VALUE)
+    .primaryKey("id")
+    .build();
+
+TableDescriptor.builder()
+    .schema(schema)
+    .property("table.merge-engine", "aggregation")
+    .build();
+
+// Input: (1, 'online', '2024-01-01 10:00:00'), (1, 'offline', '2024-01-01 
11:00:00')
+// Result: (1, 'offline', '2024-01-01 11:00:00')
+```
+
+### last_value_ignore_nulls
+
+Replaces the previous value with the latest non-null value. This is the 
**default aggregate function** when no function is specified.
+
+- **Supported Data Types**: All data types
+- **Behavior**: Uses the latest incoming value only if it's not null
+- **Null Handling**: Null values are ignored, previous value is retained
+
+**Example:**
+```java
+Schema schema = Schema.newBuilder()
+    .column("id", DataTypes.BIGINT())
+    .column("email", DataTypes.STRING(), AggFunction.LAST_VALUE_IGNORE_NULLS)
+    .column("phone", DataTypes.STRING(), AggFunction.LAST_VALUE_IGNORE_NULLS)
+    .primaryKey("id")
+    .build();
+
+TableDescriptor.builder()
+    .schema(schema)
+    .property("table.merge-engine", "aggregation")
+    .build();
+
+// Input: (1, '[email protected]', '123-456'), (1, null, '789-012')
+// Result: (1, '[email protected]', '789-012')
+// Email remains '[email protected]' because the second upsert had null email
+```
+
+### first_value
+
+Retrieves and retains the first value seen for a field.
+
+- **Supported Data Types**: All data types
+- **Behavior**: Keeps the first received value, ignores all subsequent values
+- **Null Handling**: Null values are retained if received first
+
+**Example:**
+```java
+Schema schema = Schema.newBuilder()
+    .column("id", DataTypes.BIGINT())
+    .column("first_purchase_date", DataTypes.DATE(), AggFunction.FIRST_VALUE)
+    .column("first_product", DataTypes.STRING(), AggFunction.FIRST_VALUE)
+    .primaryKey("id")
+    .build();
+
+TableDescriptor.builder()
+    .schema(schema)
+    .property("table.merge-engine", "aggregation")
+    .build();
+
+// Input: (1, '2024-01-01', 'ProductA'), (1, '2024-02-01', 'ProductB')
+// Result: (1, '2024-01-01', 'ProductA')
+```
+
+### first_value_ignore_nulls
+
+Selects the first non-null value in a data set.
+
+- **Supported Data Types**: All data types
+- **Behavior**: Keeps the first received non-null value, ignores all 
subsequent values
+- **Null Handling**: Null values are ignored until a non-null value is received
+
+**Example:**
+```java
+Schema schema = Schema.newBuilder()
+    .column("id", DataTypes.BIGINT())
+    .column("email", DataTypes.STRING(), AggFunction.FIRST_VALUE_IGNORE_NULLS)
+    .column("verified_at", DataTypes.TIMESTAMP(3), 
AggFunction.FIRST_VALUE_IGNORE_NULLS)
+    .primaryKey("id")
+    .build();
+
+TableDescriptor.builder()
+    .schema(schema)
+    .property("table.merge-engine", "aggregation")
+    .build();
+
+// Input: (1, null, null), (1, '[email protected]', '2024-01-01 10:00:00'), (1, 
'[email protected]', '2024-01-02 10:00:00')
+// Result: (1, '[email protected]', '2024-01-01 10:00:00')
+```
+
+### listagg
+
+Concatenates multiple string values into a single string with a delimiter.
+
+- **Supported Data Types**: STRING, CHAR
+- **Behavior**: Concatenates values using the specified delimiter
+- **Null Handling**: Null values are skipped
+- **Configuration**: Use 
`'table.merge-engine.aggregate.<field-name>.listagg-delimiter'` to specify a 
custom delimiter (default is comma `,`)
+
+**Example:**
+```java
+Schema schema = Schema.newBuilder()
+    .column("id", DataTypes.BIGINT())
+    .column("tags", DataTypes.STRING(), AggFunction.LISTAGG)
+    .primaryKey("id")
+    .build();
+
+TableDescriptor.builder()
+    .schema(schema)
+    .property("table.merge-engine", "aggregation")
+    .property("table.merge-engine.aggregate.tags.listagg-delimiter", ";")
+    .build();
+
+// Input: (1, 'developer'), (1, 'java'), (1, 'flink')
+// Result: (1, 'developer;java;flink')
+```
+
+### string_agg
+
+Alias for `listagg`. Concatenates multiple string values into a single string 
with a delimiter.
+
+- **Supported Data Types**: STRING, CHAR
+- **Behavior**: Same as `listagg` - concatenates values using the specified 
delimiter
+- **Null Handling**: Null values are skipped
+- **Configuration**: Use 
`'table.merge-engine.aggregate.<field-name>.listagg-delimiter'` to specify a 
custom delimiter (default is comma `,`)
+
+**Example:**
+```java
+Schema schema = Schema.newBuilder()
+    .column("id", DataTypes.BIGINT())
+    .column("tags", DataTypes.STRING(), AggFunction.STRING_AGG)
+    .primaryKey("id")
+    .build();
+
+TableDescriptor.builder()
+    .schema(schema)
+    .property("table.merge-engine", "aggregation")
+    .property("table.merge-engine.aggregate.tags.listagg-delimiter", ";")
+    .build();
+
+// Input: (1, 'developer'), (1, 'java'), (1, 'flink')
+// Result: (1, 'developer;java;flink')
+```
+
+### bool_and
+
+Evaluates whether all boolean values in a set are true (logical AND).
+
+- **Supported Data Types**: BOOLEAN
+- **Behavior**: Returns true only if all values are true
+- **Null Handling**: Null values are ignored
+
+**Example:**
+```java
+Schema schema = Schema.newBuilder()
+    .column("id", DataTypes.BIGINT())
+    .column("has_all_permissions", DataTypes.BOOLEAN(), AggFunction.BOOL_AND)
+    .primaryKey("id")
+    .build();
+
+TableDescriptor.builder()
+    .schema(schema)
+    .property("table.merge-engine", "aggregation")
+    .build();
+
+// Input: (1, true), (1, true), (1, false)
+// Result: (1, false) -- Not all values are true
+```
+
+### bool_or
+
+Checks if at least one boolean value in a set is true (logical OR).
+
+- **Supported Data Types**: BOOLEAN
+- **Behavior**: Returns true if any value is true
+- **Null Handling**: Null values are ignored
+
+**Example:**
+```java
+Schema schema = Schema.newBuilder()
+    .column("id", DataTypes.BIGINT())
+    .column("has_any_alert", DataTypes.BOOLEAN(), AggFunction.BOOL_OR)
+    .primaryKey("id")
+    .build();
+
+TableDescriptor.builder()
+    .schema(schema)
+    .property("table.merge-engine", "aggregation")
+    .build();
+
+// Input: (1, false), (1, false), (1, true)
+// Result: (1, true) -- At least one value is true
+```
+
+## Advanced Configuration
+
+### Default Aggregate Function
+
+You can set a default aggregate function for all non-primary key fields that 
don't have an explicitly specified function:
+
+```java
+Schema schema = Schema.newBuilder()
+    .column("id", DataTypes.BIGINT())
+    .column("col1", DataTypes.STRING())  // Defaults to LAST_VALUE_IGNORE_NULLS
+    .column("col2", DataTypes.BIGINT(), AggFunction.SUM)  // Explicitly set to 
SUM
+    .primaryKey("id")
+    .build();
+
+TableDescriptor.builder()
+    .schema(schema)
+    .property("table.merge-engine", "aggregation")
+    .build();
+```
+
+In this example:
+- `col2` uses `sum` aggregation (explicitly specified)
+- `col1` uses `last_value_ignore_nulls` as the default
+
+### Partial Update with Aggregation
+
+The aggregation merge engine supports partial updates through the UpsertWriter 
API. When performing a partial update:
+
+- **Target columns**: These columns will be aggregated according to their 
configured aggregate functions
+- **Non-target columns**: These columns will retain their existing values from 
the old row
+
+**Example:**
+
+```java
+Schema schema = Schema.newBuilder()
+    .column("id", DataTypes.BIGINT())
+    .column("count1", DataTypes.BIGINT(), AggFunction.SUM)
+    .column("count2", DataTypes.BIGINT(), AggFunction.SUM)
+    .column("sum1", DataTypes.DOUBLE(), AggFunction.SUM)
+    .column("sum2", DataTypes.DOUBLE(), AggFunction.SUM)
+    .primaryKey("id")
+    .build();
+
+TableDescriptor tableDescriptor = TableDescriptor.builder()
+    .schema(schema)
+    .property("table.merge-engine", "aggregation")
+    .build();
+
+// Create partial update writer targeting only id, count1, and sum1
+int[] targetColumns = new int[]{0, 1, 3}; // id, count1, sum1
+UpsertWriter partialWriter = table.newUpsert()
+    .withPartialUpdate(targetColumns)

Review Comment:
   Use name-based `.partialUpdate("id", "count1", "sum1")` which is more 
straightforward. 



##########
website/docs/table-design/table-types/pk-table/merge-engines/aggregation.md:
##########
@@ -0,0 +1,534 @@
+---
+sidebar_label: Aggregation
+title: Aggregation Merge Engine
+sidebar_position: 4

Review Comment:
   ```suggestion
   sidebar_position: 5
   ```
   
   



##########
website/docs/table-design/table-types/pk-table/merge-engines/aggregation.md:
##########
@@ -0,0 +1,534 @@
+---
+sidebar_label: Aggregation
+title: Aggregation Merge Engine
+sidebar_position: 4
+---
+
+# Aggregation Merge Engine
+
+## Overview
+
+The **Aggregation Merge Engine** is designed for scenarios where users only 
care about aggregated results rather than individual records. It aggregates 
each value field with the latest data one by one under the same primary key 
according to the specified aggregate function.
+
+Each field not part of the primary keys can be assigned an aggregate function 
using the Schema API (recommended) or connector options 
(`'fields.<field-name>.agg'`). If no function is specified for a field, it will 
use `last_value_ignore_nulls` aggregation as the default behavior.
+
+This merge engine is useful for real-time aggregation scenarios such as:
+- Computing running totals and statistics
+- Maintaining counters and metrics
+- Tracking maximum/minimum values over time
+- Building real-time dashboards and analytics
+
+## Configuration
+
+To enable the aggregation merge engine, set the following table property:
+
+```
+'table.merge-engine' = 'aggregation'
+```
+
+Then specify the aggregate function for each non-primary key field using 
connector options:
+
+```
+'fields.<field-name>.agg' = '<function-name>'
+```
+
+**Note**: The recommended way is to use Schema API (see section "API Usage" 
below). The connector option is provided as an alternative for 
connector-specific scenarios.
+
+## API Usage
+
+### Creating a Table with Aggregation
+
+```java
+import org.apache.fluss.client.Connection;
+import org.apache.fluss.client.admin.Admin;
+import org.apache.fluss.client.table.Table;
+import org.apache.fluss.metadata.Schema;
+import org.apache.fluss.metadata.TableDescriptor;
+import org.apache.fluss.metadata.TablePath;
+import org.apache.fluss.types.DataTypes;
+
+// Create connection
+Connection conn = Connection.create(config);
+Admin admin = conn.getAdmin();
+
+// Define schema with aggregation functions (recommended way)
+import org.apache.fluss.metadata.AggFunction;
+
+Schema schema = Schema.newBuilder()
+    .column("product_id", DataTypes.BIGINT())
+    .column("price", DataTypes.DOUBLE(), AggFunction.MAX)
+    .column("sales", DataTypes.BIGINT(), AggFunction.SUM)
+    .column("last_update_time", DataTypes.TIMESTAMP(3))  // Defaults to 
LAST_VALUE_IGNORE_NULLS
+    .primaryKey("product_id")
+    .build();
+
+// Create table with aggregation merge engine
+TableDescriptor tableDescriptor = TableDescriptor.builder()
+    .schema(schema)
+    .property("table.merge-engine", "aggregation")
+    .build();
+
+TablePath tablePath = TablePath.of("my_database", "product_stats");
+admin.createTable(tablePath, tableDescriptor, false).get();
+```
+
+### Writing Data
+
+```java
+// Get table
+Table table = conn.getTable(tablePath);
+
+// Create upsert writer
+UpsertWriter writer = table.newUpsert().createWriter();
+
+// Write data - these will be aggregated
+writer.upsert(row(1L, 23.0, 15L, timestamp1));
+writer.upsert(row(1L, 30.2, 20L, timestamp2)); // Same primary key - triggers 
aggregation
+
+writer.flush();
+```
+
+**Result after aggregation:**
+- `product_id`: 1
+- `price`: 30.2 (max of 23.0 and 30.2)
+- `sales`: 35 (sum of 15 and 20)
+- `last_update_time`: timestamp2 (last non-null value)
+
+## Supported Aggregate Functions
+
+Fluss currently supports the following aggregate functions:
+
+### sum
+
+Aggregates values by computing the sum across multiple rows.
+
+- **Supported Data Types**: TINYINT, SMALLINT, INT, BIGINT, FLOAT, DOUBLE, 
DECIMAL
+- **Behavior**: Adds incoming values to the accumulator
+- **Null Handling**: Null values are ignored
+
+**Example:**
+```java
+Schema schema = Schema.newBuilder()
+    .column("id", DataTypes.BIGINT())
+    .column("amount", DataTypes.DECIMAL(10, 2), AggFunction.SUM)
+    .primaryKey("id")
+    .build();
+
+TableDescriptor.builder()
+    .schema(schema)
+    .property("table.merge-engine", "aggregation")
+    .build();
+
+// Input: (1, 100.50), (1, 200.75)
+// Result: (1, 301.25)
+```
+
+### product
+
+Computes the product of values across multiple rows.
+
+- **Supported Data Types**: TINYINT, SMALLINT, INT, BIGINT, FLOAT, DOUBLE, 
DECIMAL
+- **Behavior**: Multiplies incoming values with the accumulator
+- **Null Handling**: Null values are ignored
+
+**Example:**
+```java
+Schema schema = Schema.newBuilder()
+    .column("id", DataTypes.BIGINT())
+    .column("discount_factor", DataTypes.DOUBLE(), AggFunction.PRODUCT)
+    .primaryKey("id")
+    .build();
+
+TableDescriptor.builder()
+    .schema(schema)
+    .property("table.merge-engine", "aggregation")
+    .build();
+
+// Input: (1, 0.9), (1, 0.8)
+// Result: (1, 0.72) -- 90% * 80% = 72%
+```
+
+### max
+
+Identifies and retains the maximum value.
+
+- **Supported Data Types**: CHAR, STRING, TINYINT, SMALLINT, INT, BIGINT, 
FLOAT, DOUBLE, DECIMAL, DATE, TIME, TIMESTAMP, TIMESTAMP_LTZ
+- **Behavior**: Keeps the larger value between accumulator and incoming value
+- **Null Handling**: Null values are ignored
+
+**Example:**
+```java
+Schema schema = Schema.newBuilder()
+    .column("id", DataTypes.BIGINT())
+    .column("max_temperature", DataTypes.DOUBLE(), AggFunction.MAX)
+    .column("max_reading_time", DataTypes.TIMESTAMP(3), AggFunction.MAX)
+    .primaryKey("id")
+    .build();
+
+TableDescriptor.builder()
+    .schema(schema)
+    .property("table.merge-engine", "aggregation")
+    .build();
+
+// Input: (1, 25.5, '2024-01-01 10:00:00'), (1, 28.3, '2024-01-01 11:00:00')
+// Result: (1, 28.3, '2024-01-01 11:00:00')
+```
+
+### min
+
+Identifies and retains the minimum value.
+
+- **Supported Data Types**: CHAR, STRING, TINYINT, SMALLINT, INT, BIGINT, 
FLOAT, DOUBLE, DECIMAL, DATE, TIME, TIMESTAMP, TIMESTAMP_LTZ
+- **Behavior**: Keeps the smaller value between accumulator and incoming value
+- **Null Handling**: Null values are ignored
+
+**Example:**
+```java
+Schema schema = Schema.newBuilder()
+    .column("id", DataTypes.BIGINT())
+    .column("lowest_price", DataTypes.DECIMAL(10, 2), AggFunction.MIN)
+    .primaryKey("id")
+    .build();
+
+TableDescriptor.builder()
+    .schema(schema)
+    .property("table.merge-engine", "aggregation")
+    .build();
+
+// Input: (1, 99.99), (1, 79.99), (1, 89.99)
+// Result: (1, 79.99)
+```
+
+### last_value
+
+Replaces the previous value with the most recently received value.
+
+- **Supported Data Types**: All data types
+- **Behavior**: Always uses the latest incoming value
+- **Null Handling**: Null values will overwrite previous values
+
+**Example:**
+```java
+Schema schema = Schema.newBuilder()
+    .column("id", DataTypes.BIGINT())
+    .column("status", DataTypes.STRING(), AggFunction.LAST_VALUE)
+    .column("last_login", DataTypes.TIMESTAMP(3), AggFunction.LAST_VALUE)
+    .primaryKey("id")
+    .build();
+
+TableDescriptor.builder()
+    .schema(schema)
+    .property("table.merge-engine", "aggregation")
+    .build();
+
+// Input: (1, 'online', '2024-01-01 10:00:00'), (1, 'offline', '2024-01-01 
11:00:00')
+// Result: (1, 'offline', '2024-01-01 11:00:00')
+```
+
+### last_value_ignore_nulls
+
+Replaces the previous value with the latest non-null value. This is the 
**default aggregate function** when no function is specified.
+
+- **Supported Data Types**: All data types
+- **Behavior**: Uses the latest incoming value only if it's not null
+- **Null Handling**: Null values are ignored, previous value is retained
+
+**Example:**
+```java
+Schema schema = Schema.newBuilder()
+    .column("id", DataTypes.BIGINT())
+    .column("email", DataTypes.STRING(), AggFunction.LAST_VALUE_IGNORE_NULLS)
+    .column("phone", DataTypes.STRING(), AggFunction.LAST_VALUE_IGNORE_NULLS)
+    .primaryKey("id")
+    .build();
+
+TableDescriptor.builder()
+    .schema(schema)
+    .property("table.merge-engine", "aggregation")
+    .build();
+
+// Input: (1, '[email protected]', '123-456'), (1, null, '789-012')
+// Result: (1, '[email protected]', '789-012')
+// Email remains '[email protected]' because the second upsert had null email
+```
+
+### first_value
+
+Retrieves and retains the first value seen for a field.
+
+- **Supported Data Types**: All data types
+- **Behavior**: Keeps the first received value, ignores all subsequent values
+- **Null Handling**: Null values are retained if received first
+
+**Example:**
+```java
+Schema schema = Schema.newBuilder()
+    .column("id", DataTypes.BIGINT())
+    .column("first_purchase_date", DataTypes.DATE(), AggFunction.FIRST_VALUE)
+    .column("first_product", DataTypes.STRING(), AggFunction.FIRST_VALUE)
+    .primaryKey("id")
+    .build();
+
+TableDescriptor.builder()
+    .schema(schema)
+    .property("table.merge-engine", "aggregation")
+    .build();
+
+// Input: (1, '2024-01-01', 'ProductA'), (1, '2024-02-01', 'ProductB')
+// Result: (1, '2024-01-01', 'ProductA')
+```
+
+### first_value_ignore_nulls
+
+Selects the first non-null value in a data set.
+
+- **Supported Data Types**: All data types
+- **Behavior**: Keeps the first received non-null value, ignores all 
subsequent values
+- **Null Handling**: Null values are ignored until a non-null value is received
+
+**Example:**
+```java
+Schema schema = Schema.newBuilder()
+    .column("id", DataTypes.BIGINT())
+    .column("email", DataTypes.STRING(), AggFunction.FIRST_VALUE_IGNORE_NULLS)
+    .column("verified_at", DataTypes.TIMESTAMP(3), 
AggFunction.FIRST_VALUE_IGNORE_NULLS)
+    .primaryKey("id")
+    .build();
+
+TableDescriptor.builder()
+    .schema(schema)
+    .property("table.merge-engine", "aggregation")
+    .build();
+
+// Input: (1, null, null), (1, '[email protected]', '2024-01-01 10:00:00'), (1, 
'[email protected]', '2024-01-02 10:00:00')
+// Result: (1, '[email protected]', '2024-01-01 10:00:00')
+```
+
+### listagg
+
+Concatenates multiple string values into a single string with a delimiter.
+
+- **Supported Data Types**: STRING, CHAR
+- **Behavior**: Concatenates values using the specified delimiter
+- **Null Handling**: Null values are skipped
+- **Configuration**: Use 
`'table.merge-engine.aggregate.<field-name>.listagg-delimiter'` to specify a 
custom delimiter (default is comma `,`)
+
+**Example:**
+```java
+Schema schema = Schema.newBuilder()
+    .column("id", DataTypes.BIGINT())
+    .column("tags", DataTypes.STRING(), AggFunction.LISTAGG)
+    .primaryKey("id")
+    .build();
+
+TableDescriptor.builder()
+    .schema(schema)
+    .property("table.merge-engine", "aggregation")
+    .property("table.merge-engine.aggregate.tags.listagg-delimiter", ";")

Review Comment:
   This is an issue I didn’t anticipate when designing the Schema API. Since 
`delimiter` is an **aggregate-specific parameter** (and we may introduce more 
such parameters in the future), it logically belongs to the **definition of the 
aggregate function itself**—not as a separate table property.
   
   However, this introduces two concerns:
   
   1. **It breaks our original design principle**: previously, aggregate 
functions were defined purely in terms of columns. Now, the function definition 
is split between the schema and table properties, reducing cohesion.
   2. **Naming conflict**: the property name is overly long and clashes with 
the existing Flink connector option `fields.xxx.listagg-delimiter`, which could 
cause confusion or redundancy.
   
   I see two viable paths forward:
   
   **Option 1: Remove the concept of “aggregate column”**
   - The schema would only describe *column names* and *types*.
   - All aggregate function metadata (including `delimiter`) would live in 
**table properties**.
   - ✅ *Benefits*: cleaner separation, better alignment across APIs (Java, 
Flink, Spark, Trino, etc.), and easier extensibility for future engines.
   
   **Option 2: Introduce a richer aggregate function model (similar to 
`DataTypes`)**  
   - Rename `enum AggFunction` → `AggFunctionType`.
   - Introduce a class `AggFunction` that can carry parameters (e.g., 
`delimiter`).
   - Add a utility class `AggFunctions` with methods like:
     ```java
     AggFunctions.SUM()
     AggFunctions.LISTAGG(";")
     ```
   - Allow inline definition in the schema builder:
     ```java
     .column("tags", DataTypes.STRING(), AggFunctions.LISTAGG(";"))
     ```
   - The Flink connector options would still translate into these schema 
builder calls.
   - ✅ *Benefits*: expressive, type-safe, and keeps logic co-located with the 
column definition.
   
   I’m fine with either approach. Which approach do you prefer?



##########
website/docs/table-design/table-types/pk-table/merge-engines/aggregation.md:
##########
@@ -0,0 +1,534 @@
+---
+sidebar_label: Aggregation
+title: Aggregation Merge Engine
+sidebar_position: 4
+---
+
+# Aggregation Merge Engine
+
+## Overview
+
+The **Aggregation Merge Engine** is designed for scenarios where users only 
care about aggregated results rather than individual records. It aggregates 
each value field with the latest data one by one under the same primary key 
according to the specified aggregate function.
+
+Each field not part of the primary keys can be assigned an aggregate function 
using the Schema API (recommended) or connector options 
(`'fields.<field-name>.agg'`). If no function is specified for a field, it will 
use `last_value_ignore_nulls` aggregation as the default behavior.
+
+This merge engine is useful for real-time aggregation scenarios such as:
+- Computing running totals and statistics
+- Maintaining counters and metrics
+- Tracking maximum/minimum values over time
+- Building real-time dashboards and analytics
+
+## Configuration
+
+To enable the aggregation merge engine, set the following table property:
+
+```
+'table.merge-engine' = 'aggregation'
+```
+
+Then specify the aggregate function for each non-primary key field using 
connector options:
+
+```
+'fields.<field-name>.agg' = '<function-name>'
+```
+
+**Note**: The recommended way is to use Schema API (see section "API Usage" 
below). The connector option is provided as an alternative for 
connector-specific scenarios.
+
+## API Usage
+
+### Creating a Table with Aggregation
+
+```java
+import org.apache.fluss.client.Connection;
+import org.apache.fluss.client.admin.Admin;
+import org.apache.fluss.client.table.Table;
+import org.apache.fluss.metadata.Schema;
+import org.apache.fluss.metadata.TableDescriptor;
+import org.apache.fluss.metadata.TablePath;
+import org.apache.fluss.types.DataTypes;
+
+// Create connection
+Connection conn = Connection.create(config);
+Admin admin = conn.getAdmin();
+
+// Define schema with aggregation functions (recommended way)
+import org.apache.fluss.metadata.AggFunction;
+
+Schema schema = Schema.newBuilder()
+    .column("product_id", DataTypes.BIGINT())
+    .column("price", DataTypes.DOUBLE(), AggFunction.MAX)
+    .column("sales", DataTypes.BIGINT(), AggFunction.SUM)
+    .column("last_update_time", DataTypes.TIMESTAMP(3))  // Defaults to 
LAST_VALUE_IGNORE_NULLS
+    .primaryKey("product_id")
+    .build();
+
+// Create table with aggregation merge engine
+TableDescriptor tableDescriptor = TableDescriptor.builder()
+    .schema(schema)
+    .property("table.merge-engine", "aggregation")
+    .build();
+
+TablePath tablePath = TablePath.of("my_database", "product_stats");
+admin.createTable(tablePath, tableDescriptor, false).get();
+```
+
+### Writing Data
+
+```java
+// Get table
+Table table = conn.getTable(tablePath);
+
+// Create upsert writer
+UpsertWriter writer = table.newUpsert().createWriter();
+
+// Write data - these will be aggregated
+writer.upsert(row(1L, 23.0, 15L, timestamp1));
+writer.upsert(row(1L, 30.2, 20L, timestamp2)); // Same primary key - triggers 
aggregation
+
+writer.flush();
+```
+
+**Result after aggregation:**
+- `product_id`: 1
+- `price`: 30.2 (max of 23.0 and 30.2)
+- `sales`: 35 (sum of 15 and 20)
+- `last_update_time`: timestamp2 (last non-null value)
+
+## Supported Aggregate Functions
+
+Fluss currently supports the following aggregate functions:
+
+### sum
+
+Aggregates values by computing the sum across multiple rows.
+
+- **Supported Data Types**: TINYINT, SMALLINT, INT, BIGINT, FLOAT, DOUBLE, 
DECIMAL
+- **Behavior**: Adds incoming values to the accumulator
+- **Null Handling**: Null values are ignored
+
+**Example:**
+```java
+Schema schema = Schema.newBuilder()
+    .column("id", DataTypes.BIGINT())
+    .column("amount", DataTypes.DECIMAL(10, 2), AggFunction.SUM)
+    .primaryKey("id")
+    .build();
+
+TableDescriptor.builder()
+    .schema(schema)
+    .property("table.merge-engine", "aggregation")
+    .build();
+
+// Input: (1, 100.50), (1, 200.75)
+// Result: (1, 301.25)
+```
+
+### product
+
+Computes the product of values across multiple rows.
+
+- **Supported Data Types**: TINYINT, SMALLINT, INT, BIGINT, FLOAT, DOUBLE, 
DECIMAL
+- **Behavior**: Multiplies incoming values with the accumulator
+- **Null Handling**: Null values are ignored
+
+**Example:**
+```java
+Schema schema = Schema.newBuilder()
+    .column("id", DataTypes.BIGINT())
+    .column("discount_factor", DataTypes.DOUBLE(), AggFunction.PRODUCT)
+    .primaryKey("id")
+    .build();
+
+TableDescriptor.builder()
+    .schema(schema)
+    .property("table.merge-engine", "aggregation")
+    .build();
+
+// Input: (1, 0.9), (1, 0.8)
+// Result: (1, 0.72) -- 90% * 80% = 72%
+```
+
+### max
+
+Identifies and retains the maximum value.
+
+- **Supported Data Types**: CHAR, STRING, TINYINT, SMALLINT, INT, BIGINT, 
FLOAT, DOUBLE, DECIMAL, DATE, TIME, TIMESTAMP, TIMESTAMP_LTZ
+- **Behavior**: Keeps the larger value between accumulator and incoming value
+- **Null Handling**: Null values are ignored
+
+**Example:**
+```java
+Schema schema = Schema.newBuilder()
+    .column("id", DataTypes.BIGINT())
+    .column("max_temperature", DataTypes.DOUBLE(), AggFunction.MAX)
+    .column("max_reading_time", DataTypes.TIMESTAMP(3), AggFunction.MAX)
+    .primaryKey("id")
+    .build();
+
+TableDescriptor.builder()
+    .schema(schema)
+    .property("table.merge-engine", "aggregation")
+    .build();
+
+// Input: (1, 25.5, '2024-01-01 10:00:00'), (1, 28.3, '2024-01-01 11:00:00')
+// Result: (1, 28.3, '2024-01-01 11:00:00')
+```
+
+### min
+
+Identifies and retains the minimum value.
+
+- **Supported Data Types**: CHAR, STRING, TINYINT, SMALLINT, INT, BIGINT, 
FLOAT, DOUBLE, DECIMAL, DATE, TIME, TIMESTAMP, TIMESTAMP_LTZ
+- **Behavior**: Keeps the smaller value between accumulator and incoming value
+- **Null Handling**: Null values are ignored
+
+**Example:**
+```java
+Schema schema = Schema.newBuilder()
+    .column("id", DataTypes.BIGINT())
+    .column("lowest_price", DataTypes.DECIMAL(10, 2), AggFunction.MIN)
+    .primaryKey("id")
+    .build();
+
+TableDescriptor.builder()
+    .schema(schema)
+    .property("table.merge-engine", "aggregation")
+    .build();
+
+// Input: (1, 99.99), (1, 79.99), (1, 89.99)
+// Result: (1, 79.99)
+```
+
+### last_value
+
+Replaces the previous value with the most recently received value.
+
+- **Supported Data Types**: All data types
+- **Behavior**: Always uses the latest incoming value
+- **Null Handling**: Null values will overwrite previous values
+
+**Example:**
+```java
+Schema schema = Schema.newBuilder()
+    .column("id", DataTypes.BIGINT())
+    .column("status", DataTypes.STRING(), AggFunction.LAST_VALUE)
+    .column("last_login", DataTypes.TIMESTAMP(3), AggFunction.LAST_VALUE)
+    .primaryKey("id")
+    .build();
+
+TableDescriptor.builder()
+    .schema(schema)
+    .property("table.merge-engine", "aggregation")
+    .build();
+
+// Input: (1, 'online', '2024-01-01 10:00:00'), (1, 'offline', '2024-01-01 
11:00:00')
+// Result: (1, 'offline', '2024-01-01 11:00:00')
+```
+
+### last_value_ignore_nulls
+
+Replaces the previous value with the latest non-null value. This is the 
**default aggregate function** when no function is specified.
+
+- **Supported Data Types**: All data types
+- **Behavior**: Uses the latest incoming value only if it's not null
+- **Null Handling**: Null values are ignored, previous value is retained
+
+**Example:**
+```java
+Schema schema = Schema.newBuilder()
+    .column("id", DataTypes.BIGINT())
+    .column("email", DataTypes.STRING(), AggFunction.LAST_VALUE_IGNORE_NULLS)
+    .column("phone", DataTypes.STRING(), AggFunction.LAST_VALUE_IGNORE_NULLS)
+    .primaryKey("id")
+    .build();
+
+TableDescriptor.builder()
+    .schema(schema)
+    .property("table.merge-engine", "aggregation")
+    .build();
+
+// Input: (1, '[email protected]', '123-456'), (1, null, '789-012')
+// Result: (1, '[email protected]', '789-012')
+// Email remains '[email protected]' because the second upsert had null email
+```
+
+### first_value
+
+Retrieves and retains the first value seen for a field.
+
+- **Supported Data Types**: All data types
+- **Behavior**: Keeps the first received value, ignores all subsequent values
+- **Null Handling**: Null values are retained if received first
+
+**Example:**
+```java
+Schema schema = Schema.newBuilder()
+    .column("id", DataTypes.BIGINT())
+    .column("first_purchase_date", DataTypes.DATE(), AggFunction.FIRST_VALUE)
+    .column("first_product", DataTypes.STRING(), AggFunction.FIRST_VALUE)
+    .primaryKey("id")
+    .build();
+
+TableDescriptor.builder()
+    .schema(schema)
+    .property("table.merge-engine", "aggregation")
+    .build();
+
+// Input: (1, '2024-01-01', 'ProductA'), (1, '2024-02-01', 'ProductB')
+// Result: (1, '2024-01-01', 'ProductA')
+```
+
+### first_value_ignore_nulls
+
+Selects the first non-null value in a data set.
+
+- **Supported Data Types**: All data types
+- **Behavior**: Keeps the first received non-null value, ignores all 
subsequent values
+- **Null Handling**: Null values are ignored until a non-null value is received
+
+**Example:**
+```java
+Schema schema = Schema.newBuilder()
+    .column("id", DataTypes.BIGINT())
+    .column("email", DataTypes.STRING(), AggFunction.FIRST_VALUE_IGNORE_NULLS)
+    .column("verified_at", DataTypes.TIMESTAMP(3), 
AggFunction.FIRST_VALUE_IGNORE_NULLS)
+    .primaryKey("id")
+    .build();
+
+TableDescriptor.builder()
+    .schema(schema)
+    .property("table.merge-engine", "aggregation")
+    .build();
+
+// Input: (1, null, null), (1, '[email protected]', '2024-01-01 10:00:00'), (1, 
'[email protected]', '2024-01-02 10:00:00')
+// Result: (1, '[email protected]', '2024-01-01 10:00:00')
+```
+
+### listagg
+
+Concatenates multiple string values into a single string with a delimiter.
+
+- **Supported Data Types**: STRING, CHAR
+- **Behavior**: Concatenates values using the specified delimiter
+- **Null Handling**: Null values are skipped
+- **Configuration**: Use 
`'table.merge-engine.aggregate.<field-name>.listagg-delimiter'` to specify a 
custom delimiter (default is comma `,`)
+
+**Example:**
+```java
+Schema schema = Schema.newBuilder()
+    .column("id", DataTypes.BIGINT())
+    .column("tags", DataTypes.STRING(), AggFunction.LISTAGG)
+    .primaryKey("id")
+    .build();
+
+TableDescriptor.builder()
+    .schema(schema)
+    .property("table.merge-engine", "aggregation")
+    .property("table.merge-engine.aggregate.tags.listagg-delimiter", ";")
+    .build();
+
+// Input: (1, 'developer'), (1, 'java'), (1, 'flink')
+// Result: (1, 'developer;java;flink')

Review Comment:
   It would be better to include an additional `LISTAGG` column that uses the 
**default behavior** (e.g., comma as delimiter). Since most users will rely on 
the defaults, demonstrating the expected output in this common case improves 
usability and sets clear expectations.



##########
website/docs/table-design/table-types/pk-table/merge-engines/aggregation.md:
##########
@@ -0,0 +1,534 @@
+---
+sidebar_label: Aggregation
+title: Aggregation Merge Engine
+sidebar_position: 4
+---
+
+# Aggregation Merge Engine
+
+## Overview
+
+The **Aggregation Merge Engine** is designed for scenarios where users only 
care about aggregated results rather than individual records. It aggregates 
each value field with the latest data one by one under the same primary key 
according to the specified aggregate function.
+
+Each field not part of the primary keys can be assigned an aggregate function 
using the Schema API (recommended) or connector options 
(`'fields.<field-name>.agg'`). If no function is specified for a field, it will 
use `last_value_ignore_nulls` aggregation as the default behavior.
+
+This merge engine is useful for real-time aggregation scenarios such as:
+- Computing running totals and statistics
+- Maintaining counters and metrics
+- Tracking maximum/minimum values over time
+- Building real-time dashboards and analytics
+
+## Configuration
+
+To enable the aggregation merge engine, set the following table property:
+
+```
+'table.merge-engine' = 'aggregation'
+```
+
+Then specify the aggregate function for each non-primary key field using 
connector options:
+
+```
+'fields.<field-name>.agg' = '<function-name>'
+```
+
+**Note**: The recommended way is to use Schema API (see section "API Usage" 
below). The connector option is provided as an alternative for 
connector-specific scenarios.
+
+## API Usage
+
+### Creating a Table with Aggregation
+
+```java
+import org.apache.fluss.client.Connection;
+import org.apache.fluss.client.admin.Admin;
+import org.apache.fluss.client.table.Table;
+import org.apache.fluss.metadata.Schema;
+import org.apache.fluss.metadata.TableDescriptor;
+import org.apache.fluss.metadata.TablePath;
+import org.apache.fluss.types.DataTypes;
+
+// Create connection
+Connection conn = Connection.create(config);
+Admin admin = conn.getAdmin();
+
+// Define schema with aggregation functions (recommended way)
+import org.apache.fluss.metadata.AggFunction;
+
+Schema schema = Schema.newBuilder()
+    .column("product_id", DataTypes.BIGINT())
+    .column("price", DataTypes.DOUBLE(), AggFunction.MAX)
+    .column("sales", DataTypes.BIGINT(), AggFunction.SUM)
+    .column("last_update_time", DataTypes.TIMESTAMP(3))  // Defaults to 
LAST_VALUE_IGNORE_NULLS
+    .primaryKey("product_id")
+    .build();
+
+// Create table with aggregation merge engine
+TableDescriptor tableDescriptor = TableDescriptor.builder()
+    .schema(schema)
+    .property("table.merge-engine", "aggregation")
+    .build();
+
+TablePath tablePath = TablePath.of("my_database", "product_stats");
+admin.createTable(tablePath, tableDescriptor, false).get();
+```
+
+### Writing Data
+
+```java
+// Get table
+Table table = conn.getTable(tablePath);
+
+// Create upsert writer
+UpsertWriter writer = table.newUpsert().createWriter();
+
+// Write data - these will be aggregated
+writer.upsert(row(1L, 23.0, 15L, timestamp1));
+writer.upsert(row(1L, 30.2, 20L, timestamp2)); // Same primary key - triggers 
aggregation
+
+writer.flush();
+```
+
+**Result after aggregation:**
+- `product_id`: 1
+- `price`: 30.2 (max of 23.0 and 30.2)
+- `sales`: 35 (sum of 15 and 20)
+- `last_update_time`: timestamp2 (last non-null value)
+
+## Supported Aggregate Functions
+
+Fluss currently supports the following aggregate functions:
+
+### sum
+
+Aggregates values by computing the sum across multiple rows.
+
+- **Supported Data Types**: TINYINT, SMALLINT, INT, BIGINT, FLOAT, DOUBLE, 
DECIMAL

Review Comment:
   ```suggestion
   - **Supported Data Types**: `TINYINT`, `SMALLINT`, `INT`, `BIGINT`, `FLOAT`, 
`DOUBLE`, `DECIMAL`
   ```
   
   nit: use code format for the data types. Same to others. 



##########
website/docs/table-design/table-types/pk-table/merge-engines/aggregation.md:
##########
@@ -0,0 +1,534 @@
+---
+sidebar_label: Aggregation
+title: Aggregation Merge Engine
+sidebar_position: 4
+---
+
+# Aggregation Merge Engine
+
+## Overview
+
+The **Aggregation Merge Engine** is designed for scenarios where users only 
care about aggregated results rather than individual records. It aggregates 
each value field with the latest data one by one under the same primary key 
according to the specified aggregate function.
+
+Each field not part of the primary keys can be assigned an aggregate function 
using the Schema API (recommended) or connector options 
(`'fields.<field-name>.agg'`). If no function is specified for a field, it will 
use `last_value_ignore_nulls` aggregation as the default behavior.
+
+This merge engine is useful for real-time aggregation scenarios such as:
+- Computing running totals and statistics
+- Maintaining counters and metrics
+- Tracking maximum/minimum values over time
+- Building real-time dashboards and analytics
+
+## Configuration
+
+To enable the aggregation merge engine, set the following table property:
+
+```
+'table.merge-engine' = 'aggregation'
+```
+
+Then specify the aggregate function for each non-primary key field using 
connector options:
+
+```
+'fields.<field-name>.agg' = '<function-name>'
+```
+
+**Note**: The recommended way is to use Schema API (see section "API Usage" 
below). The connector option is provided as an alternative for 
connector-specific scenarios.
+
+## API Usage
+
+### Creating a Table with Aggregation
+
+```java
+import org.apache.fluss.client.Connection;
+import org.apache.fluss.client.admin.Admin;
+import org.apache.fluss.client.table.Table;
+import org.apache.fluss.metadata.Schema;
+import org.apache.fluss.metadata.TableDescriptor;
+import org.apache.fluss.metadata.TablePath;
+import org.apache.fluss.types.DataTypes;
+
+// Create connection
+Connection conn = Connection.create(config);
+Admin admin = conn.getAdmin();
+
+// Define schema with aggregation functions (recommended way)
+import org.apache.fluss.metadata.AggFunction;
+
+Schema schema = Schema.newBuilder()
+    .column("product_id", DataTypes.BIGINT())
+    .column("price", DataTypes.DOUBLE(), AggFunction.MAX)
+    .column("sales", DataTypes.BIGINT(), AggFunction.SUM)
+    .column("last_update_time", DataTypes.TIMESTAMP(3))  // Defaults to 
LAST_VALUE_IGNORE_NULLS
+    .primaryKey("product_id")
+    .build();
+
+// Create table with aggregation merge engine
+TableDescriptor tableDescriptor = TableDescriptor.builder()
+    .schema(schema)
+    .property("table.merge-engine", "aggregation")
+    .build();
+
+TablePath tablePath = TablePath.of("my_database", "product_stats");
+admin.createTable(tablePath, tableDescriptor, false).get();
+```
+
+### Writing Data
+
+```java
+// Get table
+Table table = conn.getTable(tablePath);
+
+// Create upsert writer
+UpsertWriter writer = table.newUpsert().createWriter();
+
+// Write data - these will be aggregated
+writer.upsert(row(1L, 23.0, 15L, timestamp1));
+writer.upsert(row(1L, 30.2, 20L, timestamp2)); // Same primary key - triggers 
aggregation
+
+writer.flush();
+```
+
+**Result after aggregation:**
+- `product_id`: 1
+- `price`: 30.2 (max of 23.0 and 30.2)
+- `sales`: 35 (sum of 15 and 20)
+- `last_update_time`: timestamp2 (last non-null value)
+
+## Supported Aggregate Functions
+
+Fluss currently supports the following aggregate functions:
+
+### sum
+
+Aggregates values by computing the sum across multiple rows.
+
+- **Supported Data Types**: TINYINT, SMALLINT, INT, BIGINT, FLOAT, DOUBLE, 
DECIMAL
+- **Behavior**: Adds incoming values to the accumulator
+- **Null Handling**: Null values are ignored
+
+**Example:**
+```java
+Schema schema = Schema.newBuilder()
+    .column("id", DataTypes.BIGINT())
+    .column("amount", DataTypes.DECIMAL(10, 2), AggFunction.SUM)
+    .primaryKey("id")
+    .build();
+
+TableDescriptor.builder()
+    .schema(schema)
+    .property("table.merge-engine", "aggregation")
+    .build();
+
+// Input: (1, 100.50), (1, 200.75)
+// Result: (1, 301.25)
+```
+
+### product
+
+Computes the product of values across multiple rows.
+
+- **Supported Data Types**: TINYINT, SMALLINT, INT, BIGINT, FLOAT, DOUBLE, 
DECIMAL
+- **Behavior**: Multiplies incoming values with the accumulator
+- **Null Handling**: Null values are ignored
+
+**Example:**
+```java
+Schema schema = Schema.newBuilder()
+    .column("id", DataTypes.BIGINT())
+    .column("discount_factor", DataTypes.DOUBLE(), AggFunction.PRODUCT)
+    .primaryKey("id")
+    .build();
+
+TableDescriptor.builder()
+    .schema(schema)
+    .property("table.merge-engine", "aggregation")
+    .build();
+
+// Input: (1, 0.9), (1, 0.8)
+// Result: (1, 0.72) -- 90% * 80% = 72%
+```
+
+### max
+
+Identifies and retains the maximum value.
+
+- **Supported Data Types**: CHAR, STRING, TINYINT, SMALLINT, INT, BIGINT, 
FLOAT, DOUBLE, DECIMAL, DATE, TIME, TIMESTAMP, TIMESTAMP_LTZ
+- **Behavior**: Keeps the larger value between accumulator and incoming value
+- **Null Handling**: Null values are ignored
+
+**Example:**
+```java
+Schema schema = Schema.newBuilder()
+    .column("id", DataTypes.BIGINT())
+    .column("max_temperature", DataTypes.DOUBLE(), AggFunction.MAX)
+    .column("max_reading_time", DataTypes.TIMESTAMP(3), AggFunction.MAX)
+    .primaryKey("id")
+    .build();
+
+TableDescriptor.builder()
+    .schema(schema)
+    .property("table.merge-engine", "aggregation")
+    .build();
+
+// Input: (1, 25.5, '2024-01-01 10:00:00'), (1, 28.3, '2024-01-01 11:00:00')
+// Result: (1, 28.3, '2024-01-01 11:00:00')
+```
+
+### min
+
+Identifies and retains the minimum value.
+
+- **Supported Data Types**: CHAR, STRING, TINYINT, SMALLINT, INT, BIGINT, 
FLOAT, DOUBLE, DECIMAL, DATE, TIME, TIMESTAMP, TIMESTAMP_LTZ
+- **Behavior**: Keeps the smaller value between accumulator and incoming value
+- **Null Handling**: Null values are ignored
+
+**Example:**
+```java
+Schema schema = Schema.newBuilder()
+    .column("id", DataTypes.BIGINT())
+    .column("lowest_price", DataTypes.DECIMAL(10, 2), AggFunction.MIN)
+    .primaryKey("id")
+    .build();
+
+TableDescriptor.builder()
+    .schema(schema)
+    .property("table.merge-engine", "aggregation")
+    .build();
+
+// Input: (1, 99.99), (1, 79.99), (1, 89.99)
+// Result: (1, 79.99)
+```
+
+### last_value
+
+Replaces the previous value with the most recently received value.
+
+- **Supported Data Types**: All data types
+- **Behavior**: Always uses the latest incoming value
+- **Null Handling**: Null values will overwrite previous values
+
+**Example:**
+```java
+Schema schema = Schema.newBuilder()
+    .column("id", DataTypes.BIGINT())
+    .column("status", DataTypes.STRING(), AggFunction.LAST_VALUE)
+    .column("last_login", DataTypes.TIMESTAMP(3), AggFunction.LAST_VALUE)
+    .primaryKey("id")
+    .build();
+
+TableDescriptor.builder()
+    .schema(schema)
+    .property("table.merge-engine", "aggregation")
+    .build();
+
+// Input: (1, 'online', '2024-01-01 10:00:00'), (1, 'offline', '2024-01-01 
11:00:00')
+// Result: (1, 'offline', '2024-01-01 11:00:00')
+```
+
+### last_value_ignore_nulls
+
+Replaces the previous value with the latest non-null value. This is the 
**default aggregate function** when no function is specified.
+
+- **Supported Data Types**: All data types
+- **Behavior**: Uses the latest incoming value only if it's not null
+- **Null Handling**: Null values are ignored, previous value is retained
+
+**Example:**
+```java
+Schema schema = Schema.newBuilder()
+    .column("id", DataTypes.BIGINT())
+    .column("email", DataTypes.STRING(), AggFunction.LAST_VALUE_IGNORE_NULLS)
+    .column("phone", DataTypes.STRING(), AggFunction.LAST_VALUE_IGNORE_NULLS)
+    .primaryKey("id")
+    .build();
+
+TableDescriptor.builder()
+    .schema(schema)
+    .property("table.merge-engine", "aggregation")
+    .build();
+
+// Input: (1, '[email protected]', '123-456'), (1, null, '789-012')
+// Result: (1, '[email protected]', '789-012')
+// Email remains '[email protected]' because the second upsert had null email
+```
+
+### first_value
+
+Retrieves and retains the first value seen for a field.
+
+- **Supported Data Types**: All data types
+- **Behavior**: Keeps the first received value, ignores all subsequent values
+- **Null Handling**: Null values are retained if received first
+
+**Example:**
+```java
+Schema schema = Schema.newBuilder()
+    .column("id", DataTypes.BIGINT())
+    .column("first_purchase_date", DataTypes.DATE(), AggFunction.FIRST_VALUE)
+    .column("first_product", DataTypes.STRING(), AggFunction.FIRST_VALUE)
+    .primaryKey("id")
+    .build();
+
+TableDescriptor.builder()
+    .schema(schema)
+    .property("table.merge-engine", "aggregation")
+    .build();
+
+// Input: (1, '2024-01-01', 'ProductA'), (1, '2024-02-01', 'ProductB')
+// Result: (1, '2024-01-01', 'ProductA')
+```
+
+### first_value_ignore_nulls
+
+Selects the first non-null value in a data set.
+
+- **Supported Data Types**: All data types
+- **Behavior**: Keeps the first received non-null value, ignores all 
subsequent values
+- **Null Handling**: Null values are ignored until a non-null value is received
+
+**Example:**
+```java
+Schema schema = Schema.newBuilder()
+    .column("id", DataTypes.BIGINT())
+    .column("email", DataTypes.STRING(), AggFunction.FIRST_VALUE_IGNORE_NULLS)
+    .column("verified_at", DataTypes.TIMESTAMP(3), 
AggFunction.FIRST_VALUE_IGNORE_NULLS)
+    .primaryKey("id")
+    .build();
+
+TableDescriptor.builder()
+    .schema(schema)
+    .property("table.merge-engine", "aggregation")
+    .build();
+
+// Input: (1, null, null), (1, '[email protected]', '2024-01-01 10:00:00'), (1, 
'[email protected]', '2024-01-02 10:00:00')
+// Result: (1, '[email protected]', '2024-01-01 10:00:00')
+```
+
+### listagg
+
+Concatenates multiple string values into a single string with a delimiter.
+
+- **Supported Data Types**: STRING, CHAR
+- **Behavior**: Concatenates values using the specified delimiter
+- **Null Handling**: Null values are skipped
+- **Configuration**: Use 
`'table.merge-engine.aggregate.<field-name>.listagg-delimiter'` to specify a 
custom delimiter (default is comma `,`)
+
+**Example:**
+```java
+Schema schema = Schema.newBuilder()
+    .column("id", DataTypes.BIGINT())
+    .column("tags", DataTypes.STRING(), AggFunction.LISTAGG)
+    .primaryKey("id")
+    .build();
+
+TableDescriptor.builder()
+    .schema(schema)
+    .property("table.merge-engine", "aggregation")
+    .property("table.merge-engine.aggregate.tags.listagg-delimiter", ";")
+    .build();
+
+// Input: (1, 'developer'), (1, 'java'), (1, 'flink')
+// Result: (1, 'developer;java;flink')
+```
+
+### string_agg
+
+Alias for `listagg`. Concatenates multiple string values into a single string 
with a delimiter.
+
+- **Supported Data Types**: STRING, CHAR
+- **Behavior**: Same as `listagg` - concatenates values using the specified 
delimiter
+- **Null Handling**: Null values are skipped
+- **Configuration**: Use 
`'table.merge-engine.aggregate.<field-name>.listagg-delimiter'` to specify a 
custom delimiter (default is comma `,`)
+
+**Example:**
+```java
+Schema schema = Schema.newBuilder()
+    .column("id", DataTypes.BIGINT())
+    .column("tags", DataTypes.STRING(), AggFunction.STRING_AGG)
+    .primaryKey("id")
+    .build();
+
+TableDescriptor.builder()
+    .schema(schema)
+    .property("table.merge-engine", "aggregation")
+    .property("table.merge-engine.aggregate.tags.listagg-delimiter", ";")
+    .build();
+
+// Input: (1, 'developer'), (1, 'java'), (1, 'flink')
+// Result: (1, 'developer;java;flink')
+```
+
+### bool_and
+
+Evaluates whether all boolean values in a set are true (logical AND).
+
+- **Supported Data Types**: BOOLEAN
+- **Behavior**: Returns true only if all values are true
+- **Null Handling**: Null values are ignored
+
+**Example:**
+```java
+Schema schema = Schema.newBuilder()
+    .column("id", DataTypes.BIGINT())
+    .column("has_all_permissions", DataTypes.BOOLEAN(), AggFunction.BOOL_AND)
+    .primaryKey("id")
+    .build();
+
+TableDescriptor.builder()
+    .schema(schema)
+    .property("table.merge-engine", "aggregation")
+    .build();
+
+// Input: (1, true), (1, true), (1, false)
+// Result: (1, false) -- Not all values are true
+```
+
+### bool_or
+
+Checks if at least one boolean value in a set is true (logical OR).
+
+- **Supported Data Types**: BOOLEAN
+- **Behavior**: Returns true if any value is true
+- **Null Handling**: Null values are ignored
+
+**Example:**
+```java
+Schema schema = Schema.newBuilder()
+    .column("id", DataTypes.BIGINT())
+    .column("has_any_alert", DataTypes.BOOLEAN(), AggFunction.BOOL_OR)
+    .primaryKey("id")
+    .build();
+
+TableDescriptor.builder()
+    .schema(schema)
+    .property("table.merge-engine", "aggregation")
+    .build();
+
+// Input: (1, false), (1, false), (1, true)
+// Result: (1, true) -- At least one value is true
+```
+
+## Advanced Configuration
+
+### Default Aggregate Function
+
+You can set a default aggregate function for all non-primary key fields that 
don't have an explicitly specified function:
+
+```java
+Schema schema = Schema.newBuilder()
+    .column("id", DataTypes.BIGINT())
+    .column("col1", DataTypes.STRING())  // Defaults to LAST_VALUE_IGNORE_NULLS
+    .column("col2", DataTypes.BIGINT(), AggFunction.SUM)  // Explicitly set to 
SUM
+    .primaryKey("id")
+    .build();
+
+TableDescriptor.builder()
+    .schema(schema)
+    .property("table.merge-engine", "aggregation")
+    .build();
+```
+
+In this example:
+- `col2` uses `sum` aggregation (explicitly specified)
+- `col1` uses `last_value_ignore_nulls` as the default
+
+### Partial Update with Aggregation
+
+The aggregation merge engine supports partial updates through the UpsertWriter 
API. When performing a partial update:
+
+- **Target columns**: These columns will be aggregated according to their 
configured aggregate functions
+- **Non-target columns**: These columns will retain their existing values from 
the old row
+
+**Example:**
+
+```java
+Schema schema = Schema.newBuilder()
+    .column("id", DataTypes.BIGINT())
+    .column("count1", DataTypes.BIGINT(), AggFunction.SUM)
+    .column("count2", DataTypes.BIGINT(), AggFunction.SUM)
+    .column("sum1", DataTypes.DOUBLE(), AggFunction.SUM)
+    .column("sum2", DataTypes.DOUBLE(), AggFunction.SUM)
+    .primaryKey("id")
+    .build();
+
+TableDescriptor tableDescriptor = TableDescriptor.builder()
+    .schema(schema)
+    .property("table.merge-engine", "aggregation")
+    .build();
+
+// Create partial update writer targeting only id, count1, and sum1
+int[] targetColumns = new int[]{0, 1, 3}; // id, count1, sum1
+UpsertWriter partialWriter = table.newUpsert()
+    .withPartialUpdate(targetColumns)
+    .createWriter();
+
+// When writing:
+// - count1 and sum1 will be aggregated with existing values
+// - count2 and sum2 will remain unchanged
+```
+
+**Use cases for partial aggregation**:
+1. **Independent metrics**: When different processes update different subsets 
of metrics for the same key
+2. **Reduced data transfer**: Only send the columns that need to be updated
+3. **Flexible pipelines**: Different data sources can contribute to different 
aggregated fields
+
+### Delete Behavior
+
+The aggregation merge engine provides limited support for delete operations. 
You can configure the behavior using the `'table.agg.remove-record-on-delete'` 
option:
+
+```java
+TableDescriptor.builder()
+    .schema(schema)
+    .property("table.merge-engine", "aggregation")
+    .property("table.agg.remove-record-on-delete", "true")  // Default is false
+    .build();
+```
+
+**Configuration options**:
+- **`'table.agg.remove-record-on-delete' = 'false'`** (default): Delete 
operations will cause an error
+- **`'table.agg.remove-record-on-delete' = 'true'`**: Delete operations will 
remove the entire record from the table

Review Comment:
   This configuration appears to serve the same purpose (and may conflict) with 
the existing `table.delete.behavior` setting, which defaults to `ALLOW` for 
regular tables, but is default to `IGNORE` for `FIRST_ROW` and `VERSIONED` 
merge engines.
   
   I suggest we **reuse** `table.delete.behavior` and simply extend its logic 
to also default to `IGNORE` for the **aggregation merge engine**, rather than 
introducing a new, overlapping configuration. This keeps the configuration 
surface consistent and avoids ambiguity.



##########
website/docs/table-design/table-types/pk-table/merge-engines/aggregation.md:
##########
@@ -0,0 +1,534 @@
+---
+sidebar_label: Aggregation
+title: Aggregation Merge Engine
+sidebar_position: 4
+---
+
+# Aggregation Merge Engine
+
+## Overview
+
+The **Aggregation Merge Engine** is designed for scenarios where users only 
care about aggregated results rather than individual records. It aggregates 
each value field with the latest data one by one under the same primary key 
according to the specified aggregate function.
+
+Each field not part of the primary keys can be assigned an aggregate function 
using the Schema API (recommended) or connector options 
(`'fields.<field-name>.agg'`). If no function is specified for a field, it will 
use `last_value_ignore_nulls` aggregation as the default behavior.
+
+This merge engine is useful for real-time aggregation scenarios such as:
+- Computing running totals and statistics
+- Maintaining counters and metrics
+- Tracking maximum/minimum values over time
+- Building real-time dashboards and analytics
+
+## Configuration
+
+To enable the aggregation merge engine, set the following table property:
+
+```
+'table.merge-engine' = 'aggregation'
+```
+
+Then specify the aggregate function for each non-primary key field using 
connector options:
+
+```
+'fields.<field-name>.agg' = '<function-name>'
+```
+
+**Note**: The recommended way is to use Schema API (see section "API Usage" 
below). The connector option is provided as an alternative for 
connector-specific scenarios.
+
+## API Usage
+
+### Creating a Table with Aggregation
+
+```java
+import org.apache.fluss.client.Connection;
+import org.apache.fluss.client.admin.Admin;
+import org.apache.fluss.client.table.Table;
+import org.apache.fluss.metadata.Schema;
+import org.apache.fluss.metadata.TableDescriptor;
+import org.apache.fluss.metadata.TablePath;
+import org.apache.fluss.types.DataTypes;
+
+// Create connection
+Connection conn = Connection.create(config);
+Admin admin = conn.getAdmin();
+
+// Define schema with aggregation functions (recommended way)
+import org.apache.fluss.metadata.AggFunction;
+
+Schema schema = Schema.newBuilder()
+    .column("product_id", DataTypes.BIGINT())
+    .column("price", DataTypes.DOUBLE(), AggFunction.MAX)
+    .column("sales", DataTypes.BIGINT(), AggFunction.SUM)
+    .column("last_update_time", DataTypes.TIMESTAMP(3))  // Defaults to 
LAST_VALUE_IGNORE_NULLS
+    .primaryKey("product_id")
+    .build();
+
+// Create table with aggregation merge engine
+TableDescriptor tableDescriptor = TableDescriptor.builder()
+    .schema(schema)
+    .property("table.merge-engine", "aggregation")
+    .build();
+
+TablePath tablePath = TablePath.of("my_database", "product_stats");
+admin.createTable(tablePath, tableDescriptor, false).get();
+```
+
+### Writing Data
+
+```java
+// Get table
+Table table = conn.getTable(tablePath);
+
+// Create upsert writer
+UpsertWriter writer = table.newUpsert().createWriter();
+
+// Write data - these will be aggregated
+writer.upsert(row(1L, 23.0, 15L, timestamp1));
+writer.upsert(row(1L, 30.2, 20L, timestamp2)); // Same primary key - triggers 
aggregation
+
+writer.flush();
+```
+
+**Result after aggregation:**
+- `product_id`: 1
+- `price`: 30.2 (max of 23.0 and 30.2)
+- `sales`: 35 (sum of 15 and 20)
+- `last_update_time`: timestamp2 (last non-null value)
+
+## Supported Aggregate Functions
+
+Fluss currently supports the following aggregate functions:
+
+### sum
+
+Aggregates values by computing the sum across multiple rows.
+
+- **Supported Data Types**: TINYINT, SMALLINT, INT, BIGINT, FLOAT, DOUBLE, 
DECIMAL
+- **Behavior**: Adds incoming values to the accumulator
+- **Null Handling**: Null values are ignored
+
+**Example:**
+```java
+Schema schema = Schema.newBuilder()
+    .column("id", DataTypes.BIGINT())
+    .column("amount", DataTypes.DECIMAL(10, 2), AggFunction.SUM)
+    .primaryKey("id")
+    .build();
+
+TableDescriptor.builder()
+    .schema(schema)
+    .property("table.merge-engine", "aggregation")
+    .build();
+
+// Input: (1, 100.50), (1, 200.75)
+// Result: (1, 301.25)
+```
+
+### product
+
+Computes the product of values across multiple rows.
+
+- **Supported Data Types**: TINYINT, SMALLINT, INT, BIGINT, FLOAT, DOUBLE, 
DECIMAL
+- **Behavior**: Multiplies incoming values with the accumulator
+- **Null Handling**: Null values are ignored
+
+**Example:**
+```java
+Schema schema = Schema.newBuilder()
+    .column("id", DataTypes.BIGINT())
+    .column("discount_factor", DataTypes.DOUBLE(), AggFunction.PRODUCT)
+    .primaryKey("id")
+    .build();
+
+TableDescriptor.builder()
+    .schema(schema)
+    .property("table.merge-engine", "aggregation")
+    .build();
+
+// Input: (1, 0.9), (1, 0.8)
+// Result: (1, 0.72) -- 90% * 80% = 72%
+```
+
+### max
+
+Identifies and retains the maximum value.
+
+- **Supported Data Types**: CHAR, STRING, TINYINT, SMALLINT, INT, BIGINT, 
FLOAT, DOUBLE, DECIMAL, DATE, TIME, TIMESTAMP, TIMESTAMP_LTZ
+- **Behavior**: Keeps the larger value between accumulator and incoming value
+- **Null Handling**: Null values are ignored
+
+**Example:**
+```java
+Schema schema = Schema.newBuilder()
+    .column("id", DataTypes.BIGINT())
+    .column("max_temperature", DataTypes.DOUBLE(), AggFunction.MAX)
+    .column("max_reading_time", DataTypes.TIMESTAMP(3), AggFunction.MAX)
+    .primaryKey("id")
+    .build();
+
+TableDescriptor.builder()
+    .schema(schema)
+    .property("table.merge-engine", "aggregation")
+    .build();
+
+// Input: (1, 25.5, '2024-01-01 10:00:00'), (1, 28.3, '2024-01-01 11:00:00')
+// Result: (1, 28.3, '2024-01-01 11:00:00')
+```
+
+### min
+
+Identifies and retains the minimum value.
+
+- **Supported Data Types**: CHAR, STRING, TINYINT, SMALLINT, INT, BIGINT, 
FLOAT, DOUBLE, DECIMAL, DATE, TIME, TIMESTAMP, TIMESTAMP_LTZ
+- **Behavior**: Keeps the smaller value between accumulator and incoming value
+- **Null Handling**: Null values are ignored
+
+**Example:**
+```java
+Schema schema = Schema.newBuilder()
+    .column("id", DataTypes.BIGINT())
+    .column("lowest_price", DataTypes.DECIMAL(10, 2), AggFunction.MIN)
+    .primaryKey("id")
+    .build();
+
+TableDescriptor.builder()
+    .schema(schema)
+    .property("table.merge-engine", "aggregation")
+    .build();
+
+// Input: (1, 99.99), (1, 79.99), (1, 89.99)
+// Result: (1, 79.99)
+```
+
+### last_value
+
+Replaces the previous value with the most recently received value.
+
+- **Supported Data Types**: All data types
+- **Behavior**: Always uses the latest incoming value
+- **Null Handling**: Null values will overwrite previous values
+
+**Example:**
+```java
+Schema schema = Schema.newBuilder()
+    .column("id", DataTypes.BIGINT())
+    .column("status", DataTypes.STRING(), AggFunction.LAST_VALUE)
+    .column("last_login", DataTypes.TIMESTAMP(3), AggFunction.LAST_VALUE)
+    .primaryKey("id")
+    .build();
+
+TableDescriptor.builder()
+    .schema(schema)
+    .property("table.merge-engine", "aggregation")
+    .build();
+
+// Input: (1, 'online', '2024-01-01 10:00:00'), (1, 'offline', '2024-01-01 
11:00:00')
+// Result: (1, 'offline', '2024-01-01 11:00:00')
+```
+
+### last_value_ignore_nulls
+
+Replaces the previous value with the latest non-null value. This is the 
**default aggregate function** when no function is specified.
+
+- **Supported Data Types**: All data types
+- **Behavior**: Uses the latest incoming value only if it's not null
+- **Null Handling**: Null values are ignored, previous value is retained
+
+**Example:**
+```java
+Schema schema = Schema.newBuilder()
+    .column("id", DataTypes.BIGINT())
+    .column("email", DataTypes.STRING(), AggFunction.LAST_VALUE_IGNORE_NULLS)
+    .column("phone", DataTypes.STRING(), AggFunction.LAST_VALUE_IGNORE_NULLS)
+    .primaryKey("id")
+    .build();
+
+TableDescriptor.builder()
+    .schema(schema)
+    .property("table.merge-engine", "aggregation")
+    .build();
+
+// Input: (1, '[email protected]', '123-456'), (1, null, '789-012')
+// Result: (1, '[email protected]', '789-012')
+// Email remains '[email protected]' because the second upsert had null email
+```
+
+### first_value
+
+Retrieves and retains the first value seen for a field.
+
+- **Supported Data Types**: All data types
+- **Behavior**: Keeps the first received value, ignores all subsequent values
+- **Null Handling**: Null values are retained if received first
+
+**Example:**
+```java
+Schema schema = Schema.newBuilder()
+    .column("id", DataTypes.BIGINT())
+    .column("first_purchase_date", DataTypes.DATE(), AggFunction.FIRST_VALUE)
+    .column("first_product", DataTypes.STRING(), AggFunction.FIRST_VALUE)
+    .primaryKey("id")
+    .build();
+
+TableDescriptor.builder()
+    .schema(schema)
+    .property("table.merge-engine", "aggregation")
+    .build();
+
+// Input: (1, '2024-01-01', 'ProductA'), (1, '2024-02-01', 'ProductB')
+// Result: (1, '2024-01-01', 'ProductA')
+```
+
+### first_value_ignore_nulls
+
+Selects the first non-null value in a data set.
+
+- **Supported Data Types**: All data types
+- **Behavior**: Keeps the first received non-null value, ignores all 
subsequent values
+- **Null Handling**: Null values are ignored until a non-null value is received
+
+**Example:**
+```java
+Schema schema = Schema.newBuilder()
+    .column("id", DataTypes.BIGINT())
+    .column("email", DataTypes.STRING(), AggFunction.FIRST_VALUE_IGNORE_NULLS)
+    .column("verified_at", DataTypes.TIMESTAMP(3), 
AggFunction.FIRST_VALUE_IGNORE_NULLS)
+    .primaryKey("id")
+    .build();
+
+TableDescriptor.builder()
+    .schema(schema)
+    .property("table.merge-engine", "aggregation")
+    .build();
+
+// Input: (1, null, null), (1, '[email protected]', '2024-01-01 10:00:00'), (1, 
'[email protected]', '2024-01-02 10:00:00')
+// Result: (1, '[email protected]', '2024-01-01 10:00:00')
+```
+
+### listagg
+
+Concatenates multiple string values into a single string with a delimiter.
+
+- **Supported Data Types**: STRING, CHAR
+- **Behavior**: Concatenates values using the specified delimiter
+- **Null Handling**: Null values are skipped
+- **Configuration**: Use 
`'table.merge-engine.aggregate.<field-name>.listagg-delimiter'` to specify a 
custom delimiter (default is comma `,`)
+
+**Example:**
+```java
+Schema schema = Schema.newBuilder()
+    .column("id", DataTypes.BIGINT())
+    .column("tags", DataTypes.STRING(), AggFunction.LISTAGG)
+    .primaryKey("id")
+    .build();
+
+TableDescriptor.builder()
+    .schema(schema)
+    .property("table.merge-engine", "aggregation")
+    .property("table.merge-engine.aggregate.tags.listagg-delimiter", ";")
+    .build();
+
+// Input: (1, 'developer'), (1, 'java'), (1, 'flink')
+// Result: (1, 'developer;java;flink')
+```
+
+### string_agg
+
+Alias for `listagg`. Concatenates multiple string values into a single string 
with a delimiter.
+
+- **Supported Data Types**: STRING, CHAR
+- **Behavior**: Same as `listagg` - concatenates values using the specified 
delimiter
+- **Null Handling**: Null values are skipped
+- **Configuration**: Use 
`'table.merge-engine.aggregate.<field-name>.listagg-delimiter'` to specify a 
custom delimiter (default is comma `,`)
+
+**Example:**
+```java
+Schema schema = Schema.newBuilder()
+    .column("id", DataTypes.BIGINT())
+    .column("tags", DataTypes.STRING(), AggFunction.STRING_AGG)
+    .primaryKey("id")
+    .build();
+
+TableDescriptor.builder()
+    .schema(schema)
+    .property("table.merge-engine", "aggregation")
+    .property("table.merge-engine.aggregate.tags.listagg-delimiter", ";")
+    .build();
+
+// Input: (1, 'developer'), (1, 'java'), (1, 'flink')
+// Result: (1, 'developer;java;flink')
+```
+
+### bool_and
+
+Evaluates whether all boolean values in a set are true (logical AND).
+
+- **Supported Data Types**: BOOLEAN
+- **Behavior**: Returns true only if all values are true
+- **Null Handling**: Null values are ignored
+
+**Example:**
+```java
+Schema schema = Schema.newBuilder()
+    .column("id", DataTypes.BIGINT())
+    .column("has_all_permissions", DataTypes.BOOLEAN(), AggFunction.BOOL_AND)
+    .primaryKey("id")
+    .build();
+
+TableDescriptor.builder()
+    .schema(schema)
+    .property("table.merge-engine", "aggregation")
+    .build();
+
+// Input: (1, true), (1, true), (1, false)
+// Result: (1, false) -- Not all values are true
+```
+
+### bool_or
+
+Checks if at least one boolean value in a set is true (logical OR).
+
+- **Supported Data Types**: BOOLEAN
+- **Behavior**: Returns true if any value is true
+- **Null Handling**: Null values are ignored
+
+**Example:**
+```java
+Schema schema = Schema.newBuilder()
+    .column("id", DataTypes.BIGINT())
+    .column("has_any_alert", DataTypes.BOOLEAN(), AggFunction.BOOL_OR)
+    .primaryKey("id")
+    .build();
+
+TableDescriptor.builder()
+    .schema(schema)
+    .property("table.merge-engine", "aggregation")
+    .build();
+
+// Input: (1, false), (1, false), (1, true)
+// Result: (1, true) -- At least one value is true
+```
+
+## Advanced Configuration
+
+### Default Aggregate Function
+
+You can set a default aggregate function for all non-primary key fields that 
don't have an explicitly specified function:
+
+```java
+Schema schema = Schema.newBuilder()
+    .column("id", DataTypes.BIGINT())
+    .column("col1", DataTypes.STRING())  // Defaults to LAST_VALUE_IGNORE_NULLS
+    .column("col2", DataTypes.BIGINT(), AggFunction.SUM)  // Explicitly set to 
SUM
+    .primaryKey("id")
+    .build();
+
+TableDescriptor.builder()
+    .schema(schema)
+    .property("table.merge-engine", "aggregation")
+    .build();
+```
+
+In this example:
+- `col2` uses `sum` aggregation (explicitly specified)
+- `col1` uses `last_value_ignore_nulls` as the default
+
+### Partial Update with Aggregation
+
+The aggregation merge engine supports partial updates through the UpsertWriter 
API. When performing a partial update:
+
+- **Target columns**: These columns will be aggregated according to their 
configured aggregate functions
+- **Non-target columns**: These columns will retain their existing values from 
the old row
+
+**Example:**
+
+```java
+Schema schema = Schema.newBuilder()
+    .column("id", DataTypes.BIGINT())
+    .column("count1", DataTypes.BIGINT(), AggFunction.SUM)
+    .column("count2", DataTypes.BIGINT(), AggFunction.SUM)
+    .column("sum1", DataTypes.DOUBLE(), AggFunction.SUM)
+    .column("sum2", DataTypes.DOUBLE(), AggFunction.SUM)
+    .primaryKey("id")
+    .build();
+
+TableDescriptor tableDescriptor = TableDescriptor.builder()
+    .schema(schema)
+    .property("table.merge-engine", "aggregation")
+    .build();
+
+// Create partial update writer targeting only id, count1, and sum1
+int[] targetColumns = new int[]{0, 1, 3}; // id, count1, sum1
+UpsertWriter partialWriter = table.newUpsert()
+    .withPartialUpdate(targetColumns)
+    .createWriter();
+
+// When writing:
+// - count1 and sum1 will be aggregated with existing values
+// - count2 and sum2 will remain unchanged
+```
+
+**Use cases for partial aggregation**:
+1. **Independent metrics**: When different processes update different subsets 
of metrics for the same key
+2. **Reduced data transfer**: Only send the columns that need to be updated
+3. **Flexible pipelines**: Different data sources can contribute to different 
aggregated fields
+
+### Delete Behavior
+
+The aggregation merge engine provides limited support for delete operations. 
You can configure the behavior using the `'table.agg.remove-record-on-delete'` 
option:
+
+```java
+TableDescriptor.builder()
+    .schema(schema)
+    .property("table.merge-engine", "aggregation")
+    .property("table.agg.remove-record-on-delete", "true")  // Default is false
+    .build();
+```
+
+**Configuration options**:
+- **`'table.agg.remove-record-on-delete' = 'false'`** (default): Delete 
operations will cause an error
+- **`'table.agg.remove-record-on-delete' = 'true'`**: Delete operations will 
remove the entire record from the table
+
+:::note
+**Current Limitation**: The aggregation merge engine does not support 
retraction semantics (e.g., subtracting from a sum, reverting a max). Delete 
operations can only remove the entire record or be rejected.
+
+Future versions may support fine-grained retraction by enhancing the protocol 
to carry row data with delete operations.
+:::
+
+## Performance Considerations
+
+1. **Choose Appropriate Aggregate Functions**: Select functions that match 
your use case to avoid unnecessary computations
+2. **Primary Key Design**: Use appropriate primary keys to ensure proper 
grouping of aggregated data
+3. **Null Handling**: Be aware of how each function handles null values to 
avoid unexpected results
+4. **Delete Handling**: If you need to handle delete operations, be aware that 
enabling `'table.agg.remove-record-on-delete' = 'true'` will remove entire 
records rather than retracting aggregated values
+
+## Limitations
+
+:::warning Critical Limitations
+When using the `aggregation` merge engine, be aware of the following critical 
limitations:
+
+### 1. Exactly-Once Semantics
+
+**Fluss engine does not natively support transactional writes, and therefore 
does not directly support Exactly-Once semantics at the storage layer.**
+
+Exactly-Once semantics should be achieved through integration with compute 
engines (e.g., Flink, Spark). For example, after failover, undo operations can 
be generated for invalid writes to achieve rollback.
+
+For detailed information about Exactly-Once implementation, please refer to: 
[FIP-21: Aggregation Merge 
Engine](https://cwiki.apache.org/confluence/display/FLUSS/FIP-21%3A+Aggregation+Merge+Engine)

Review Comment:
   From a user’s perspective, the current wording may give the impression that 
Fluss’s aggregate merge engine **does not support exactly-once semantics at 
all**. To avoid confusion, we should first clarify this at the beginning of the 
section:
   
   - **When writing to an aggregate merge engine table using the Flink 
engine**, Fluss **does provide exactly-once guarantees**. Thanks to Flink’s 
checkpointing mechanism, in the event of a failure and recovery, the Flink 
connector automatically performs an **undo operation** to roll back the table 
state to what it was at the last successful checkpoint. This ensures no 
over-counting or under-counting: data remains consistent and accurate.
   
   And then to mention the limitation:
   
   - **However, when using the Fluss client API directly** (outside of Flink), 
**exactly-once is not provided out of the box**. In such cases, users must 
implement their own recovery logic (similar to what the Flink connector does) 
by explicitly resetting the table state to a previous version by performing 
undo operations.
   



##########
website/docs/table-design/table-types/pk-table/merge-engines/aggregation.md:
##########
@@ -0,0 +1,534 @@
+---
+sidebar_label: Aggregation
+title: Aggregation Merge Engine
+sidebar_position: 4
+---
+
+# Aggregation Merge Engine
+
+## Overview
+
+The **Aggregation Merge Engine** is designed for scenarios where users only 
care about aggregated results rather than individual records. It aggregates 
each value field with the latest data one by one under the same primary key 
according to the specified aggregate function.
+
+Each field not part of the primary keys can be assigned an aggregate function 
using the Schema API (recommended) or connector options 
(`'fields.<field-name>.agg'`). If no function is specified for a field, it will 
use `last_value_ignore_nulls` aggregation as the default behavior.
+
+This merge engine is useful for real-time aggregation scenarios such as:
+- Computing running totals and statistics
+- Maintaining counters and metrics
+- Tracking maximum/minimum values over time
+- Building real-time dashboards and analytics
+
+## Configuration
+
+To enable the aggregation merge engine, set the following table property:
+
+```
+'table.merge-engine' = 'aggregation'
+```
+
+Then specify the aggregate function for each non-primary key field using 
connector options:
+
+```
+'fields.<field-name>.agg' = '<function-name>'
+```
+
+**Note**: The recommended way is to use Schema API (see section "API Usage" 
below). The connector option is provided as an alternative for 
connector-specific scenarios.
+
+## API Usage
+
+### Creating a Table with Aggregation
+
+```java
+import org.apache.fluss.client.Connection;
+import org.apache.fluss.client.admin.Admin;
+import org.apache.fluss.client.table.Table;
+import org.apache.fluss.metadata.Schema;
+import org.apache.fluss.metadata.TableDescriptor;
+import org.apache.fluss.metadata.TablePath;
+import org.apache.fluss.types.DataTypes;
+
+// Create connection
+Connection conn = Connection.create(config);
+Admin admin = conn.getAdmin();
+
+// Define schema with aggregation functions (recommended way)
+import org.apache.fluss.metadata.AggFunction;
+
+Schema schema = Schema.newBuilder()
+    .column("product_id", DataTypes.BIGINT())
+    .column("price", DataTypes.DOUBLE(), AggFunction.MAX)
+    .column("sales", DataTypes.BIGINT(), AggFunction.SUM)
+    .column("last_update_time", DataTypes.TIMESTAMP(3))  // Defaults to 
LAST_VALUE_IGNORE_NULLS
+    .primaryKey("product_id")
+    .build();
+
+// Create table with aggregation merge engine
+TableDescriptor tableDescriptor = TableDescriptor.builder()
+    .schema(schema)
+    .property("table.merge-engine", "aggregation")
+    .build();
+
+TablePath tablePath = TablePath.of("my_database", "product_stats");
+admin.createTable(tablePath, tableDescriptor, false).get();
+```
+
+### Writing Data
+
+```java
+// Get table
+Table table = conn.getTable(tablePath);
+
+// Create upsert writer
+UpsertWriter writer = table.newUpsert().createWriter();
+
+// Write data - these will be aggregated
+writer.upsert(row(1L, 23.0, 15L, timestamp1));
+writer.upsert(row(1L, 30.2, 20L, timestamp2)); // Same primary key - triggers 
aggregation
+
+writer.flush();
+```
+
+**Result after aggregation:**
+- `product_id`: 1
+- `price`: 30.2 (max of 23.0 and 30.2)
+- `sales`: 35 (sum of 15 and 20)
+- `last_update_time`: timestamp2 (last non-null value)
+
+## Supported Aggregate Functions
+
+Fluss currently supports the following aggregate functions:
+
+### sum
+
+Aggregates values by computing the sum across multiple rows.
+
+- **Supported Data Types**: TINYINT, SMALLINT, INT, BIGINT, FLOAT, DOUBLE, 
DECIMAL
+- **Behavior**: Adds incoming values to the accumulator
+- **Null Handling**: Null values are ignored
+
+**Example:**
+```java
+Schema schema = Schema.newBuilder()
+    .column("id", DataTypes.BIGINT())
+    .column("amount", DataTypes.DECIMAL(10, 2), AggFunction.SUM)
+    .primaryKey("id")
+    .build();
+
+TableDescriptor.builder()
+    .schema(schema)
+    .property("table.merge-engine", "aggregation")
+    .build();
+
+// Input: (1, 100.50), (1, 200.75)
+// Result: (1, 301.25)
+```
+
+### product
+
+Computes the product of values across multiple rows.
+
+- **Supported Data Types**: TINYINT, SMALLINT, INT, BIGINT, FLOAT, DOUBLE, 
DECIMAL
+- **Behavior**: Multiplies incoming values with the accumulator
+- **Null Handling**: Null values are ignored
+
+**Example:**
+```java
+Schema schema = Schema.newBuilder()
+    .column("id", DataTypes.BIGINT())
+    .column("discount_factor", DataTypes.DOUBLE(), AggFunction.PRODUCT)
+    .primaryKey("id")
+    .build();
+
+TableDescriptor.builder()
+    .schema(schema)
+    .property("table.merge-engine", "aggregation")
+    .build();
+
+// Input: (1, 0.9), (1, 0.8)
+// Result: (1, 0.72) -- 90% * 80% = 72%
+```
+
+### max
+
+Identifies and retains the maximum value.
+
+- **Supported Data Types**: CHAR, STRING, TINYINT, SMALLINT, INT, BIGINT, 
FLOAT, DOUBLE, DECIMAL, DATE, TIME, TIMESTAMP, TIMESTAMP_LTZ
+- **Behavior**: Keeps the larger value between accumulator and incoming value
+- **Null Handling**: Null values are ignored
+
+**Example:**
+```java
+Schema schema = Schema.newBuilder()
+    .column("id", DataTypes.BIGINT())
+    .column("max_temperature", DataTypes.DOUBLE(), AggFunction.MAX)
+    .column("max_reading_time", DataTypes.TIMESTAMP(3), AggFunction.MAX)
+    .primaryKey("id")
+    .build();
+
+TableDescriptor.builder()
+    .schema(schema)
+    .property("table.merge-engine", "aggregation")
+    .build();
+
+// Input: (1, 25.5, '2024-01-01 10:00:00'), (1, 28.3, '2024-01-01 11:00:00')
+// Result: (1, 28.3, '2024-01-01 11:00:00')
+```
+
+### min
+
+Identifies and retains the minimum value.
+
+- **Supported Data Types**: CHAR, STRING, TINYINT, SMALLINT, INT, BIGINT, 
FLOAT, DOUBLE, DECIMAL, DATE, TIME, TIMESTAMP, TIMESTAMP_LTZ
+- **Behavior**: Keeps the smaller value between accumulator and incoming value
+- **Null Handling**: Null values are ignored
+
+**Example:**
+```java
+Schema schema = Schema.newBuilder()
+    .column("id", DataTypes.BIGINT())
+    .column("lowest_price", DataTypes.DECIMAL(10, 2), AggFunction.MIN)
+    .primaryKey("id")
+    .build();
+
+TableDescriptor.builder()
+    .schema(schema)
+    .property("table.merge-engine", "aggregation")
+    .build();
+
+// Input: (1, 99.99), (1, 79.99), (1, 89.99)
+// Result: (1, 79.99)
+```
+
+### last_value
+
+Replaces the previous value with the most recently received value.
+
+- **Supported Data Types**: All data types
+- **Behavior**: Always uses the latest incoming value
+- **Null Handling**: Null values will overwrite previous values
+
+**Example:**
+```java
+Schema schema = Schema.newBuilder()
+    .column("id", DataTypes.BIGINT())
+    .column("status", DataTypes.STRING(), AggFunction.LAST_VALUE)
+    .column("last_login", DataTypes.TIMESTAMP(3), AggFunction.LAST_VALUE)
+    .primaryKey("id")
+    .build();
+
+TableDescriptor.builder()
+    .schema(schema)
+    .property("table.merge-engine", "aggregation")
+    .build();
+
+// Input: (1, 'online', '2024-01-01 10:00:00'), (1, 'offline', '2024-01-01 
11:00:00')
+// Result: (1, 'offline', '2024-01-01 11:00:00')
+```
+
+### last_value_ignore_nulls
+
+Replaces the previous value with the latest non-null value. This is the 
**default aggregate function** when no function is specified.
+
+- **Supported Data Types**: All data types
+- **Behavior**: Uses the latest incoming value only if it's not null
+- **Null Handling**: Null values are ignored, previous value is retained
+
+**Example:**
+```java
+Schema schema = Schema.newBuilder()
+    .column("id", DataTypes.BIGINT())
+    .column("email", DataTypes.STRING(), AggFunction.LAST_VALUE_IGNORE_NULLS)
+    .column("phone", DataTypes.STRING(), AggFunction.LAST_VALUE_IGNORE_NULLS)
+    .primaryKey("id")
+    .build();
+
+TableDescriptor.builder()
+    .schema(schema)
+    .property("table.merge-engine", "aggregation")
+    .build();
+
+// Input: (1, '[email protected]', '123-456'), (1, null, '789-012')
+// Result: (1, '[email protected]', '789-012')
+// Email remains '[email protected]' because the second upsert had null email
+```
+
+### first_value
+
+Retrieves and retains the first value seen for a field.
+
+- **Supported Data Types**: All data types
+- **Behavior**: Keeps the first received value, ignores all subsequent values
+- **Null Handling**: Null values are retained if received first
+
+**Example:**
+```java
+Schema schema = Schema.newBuilder()
+    .column("id", DataTypes.BIGINT())
+    .column("first_purchase_date", DataTypes.DATE(), AggFunction.FIRST_VALUE)
+    .column("first_product", DataTypes.STRING(), AggFunction.FIRST_VALUE)
+    .primaryKey("id")
+    .build();
+
+TableDescriptor.builder()
+    .schema(schema)
+    .property("table.merge-engine", "aggregation")
+    .build();
+
+// Input: (1, '2024-01-01', 'ProductA'), (1, '2024-02-01', 'ProductB')
+// Result: (1, '2024-01-01', 'ProductA')
+```
+
+### first_value_ignore_nulls
+
+Selects the first non-null value in a data set.
+
+- **Supported Data Types**: All data types
+- **Behavior**: Keeps the first received non-null value, ignores all 
subsequent values
+- **Null Handling**: Null values are ignored until a non-null value is received
+
+**Example:**
+```java
+Schema schema = Schema.newBuilder()
+    .column("id", DataTypes.BIGINT())
+    .column("email", DataTypes.STRING(), AggFunction.FIRST_VALUE_IGNORE_NULLS)
+    .column("verified_at", DataTypes.TIMESTAMP(3), 
AggFunction.FIRST_VALUE_IGNORE_NULLS)
+    .primaryKey("id")
+    .build();
+
+TableDescriptor.builder()
+    .schema(schema)
+    .property("table.merge-engine", "aggregation")
+    .build();
+
+// Input: (1, null, null), (1, '[email protected]', '2024-01-01 10:00:00'), (1, 
'[email protected]', '2024-01-02 10:00:00')
+// Result: (1, '[email protected]', '2024-01-01 10:00:00')
+```
+
+### listagg
+
+Concatenates multiple string values into a single string with a delimiter.
+
+- **Supported Data Types**: STRING, CHAR
+- **Behavior**: Concatenates values using the specified delimiter
+- **Null Handling**: Null values are skipped
+- **Configuration**: Use 
`'table.merge-engine.aggregate.<field-name>.listagg-delimiter'` to specify a 
custom delimiter (default is comma `,`)
+
+**Example:**
+```java
+Schema schema = Schema.newBuilder()
+    .column("id", DataTypes.BIGINT())
+    .column("tags", DataTypes.STRING(), AggFunction.LISTAGG)
+    .primaryKey("id")
+    .build();
+
+TableDescriptor.builder()
+    .schema(schema)
+    .property("table.merge-engine", "aggregation")
+    .property("table.merge-engine.aggregate.tags.listagg-delimiter", ";")
+    .build();
+
+// Input: (1, 'developer'), (1, 'java'), (1, 'flink')
+// Result: (1, 'developer;java;flink')
+```
+
+### string_agg
+
+Alias for `listagg`. Concatenates multiple string values into a single string 
with a delimiter.
+
+- **Supported Data Types**: STRING, CHAR
+- **Behavior**: Same as `listagg` - concatenates values using the specified 
delimiter
+- **Null Handling**: Null values are skipped
+- **Configuration**: Use 
`'table.merge-engine.aggregate.<field-name>.listagg-delimiter'` to specify a 
custom delimiter (default is comma `,`)
+
+**Example:**
+```java
+Schema schema = Schema.newBuilder()
+    .column("id", DataTypes.BIGINT())
+    .column("tags", DataTypes.STRING(), AggFunction.STRING_AGG)
+    .primaryKey("id")
+    .build();
+
+TableDescriptor.builder()
+    .schema(schema)
+    .property("table.merge-engine", "aggregation")
+    .property("table.merge-engine.aggregate.tags.listagg-delimiter", ";")
+    .build();
+
+// Input: (1, 'developer'), (1, 'java'), (1, 'flink')
+// Result: (1, 'developer;java;flink')
+```
+
+### bool_and
+
+Evaluates whether all boolean values in a set are true (logical AND).
+
+- **Supported Data Types**: BOOLEAN
+- **Behavior**: Returns true only if all values are true
+- **Null Handling**: Null values are ignored
+
+**Example:**
+```java
+Schema schema = Schema.newBuilder()
+    .column("id", DataTypes.BIGINT())
+    .column("has_all_permissions", DataTypes.BOOLEAN(), AggFunction.BOOL_AND)
+    .primaryKey("id")
+    .build();
+
+TableDescriptor.builder()
+    .schema(schema)
+    .property("table.merge-engine", "aggregation")
+    .build();
+
+// Input: (1, true), (1, true), (1, false)
+// Result: (1, false) -- Not all values are true
+```
+
+### bool_or
+
+Checks if at least one boolean value in a set is true (logical OR).
+
+- **Supported Data Types**: BOOLEAN
+- **Behavior**: Returns true if any value is true
+- **Null Handling**: Null values are ignored
+
+**Example:**
+```java
+Schema schema = Schema.newBuilder()
+    .column("id", DataTypes.BIGINT())
+    .column("has_any_alert", DataTypes.BOOLEAN(), AggFunction.BOOL_OR)
+    .primaryKey("id")
+    .build();
+
+TableDescriptor.builder()
+    .schema(schema)
+    .property("table.merge-engine", "aggregation")
+    .build();
+
+// Input: (1, false), (1, false), (1, true)
+// Result: (1, true) -- At least one value is true
+```
+
+## Advanced Configuration
+
+### Default Aggregate Function
+
+You can set a default aggregate function for all non-primary key fields that 
don't have an explicitly specified function:
+
+```java
+Schema schema = Schema.newBuilder()
+    .column("id", DataTypes.BIGINT())
+    .column("col1", DataTypes.STRING())  // Defaults to LAST_VALUE_IGNORE_NULLS
+    .column("col2", DataTypes.BIGINT(), AggFunction.SUM)  // Explicitly set to 
SUM
+    .primaryKey("id")
+    .build();
+
+TableDescriptor.builder()
+    .schema(schema)
+    .property("table.merge-engine", "aggregation")
+    .build();
+```
+
+In this example:
+- `col2` uses `sum` aggregation (explicitly specified)
+- `col1` uses `last_value_ignore_nulls` as the default
+
+### Partial Update with Aggregation
+
+The aggregation merge engine supports partial updates through the UpsertWriter 
API. When performing a partial update:
+
+- **Target columns**: These columns will be aggregated according to their 
configured aggregate functions
+- **Non-target columns**: These columns will retain their existing values from 
the old row
+
+**Example:**
+
+```java
+Schema schema = Schema.newBuilder()
+    .column("id", DataTypes.BIGINT())
+    .column("count1", DataTypes.BIGINT(), AggFunction.SUM)
+    .column("count2", DataTypes.BIGINT(), AggFunction.SUM)
+    .column("sum1", DataTypes.DOUBLE(), AggFunction.SUM)
+    .column("sum2", DataTypes.DOUBLE(), AggFunction.SUM)
+    .primaryKey("id")
+    .build();
+
+TableDescriptor tableDescriptor = TableDescriptor.builder()
+    .schema(schema)
+    .property("table.merge-engine", "aggregation")
+    .build();
+
+// Create partial update writer targeting only id, count1, and sum1
+int[] targetColumns = new int[]{0, 1, 3}; // id, count1, sum1
+UpsertWriter partialWriter = table.newUpsert()
+    .withPartialUpdate(targetColumns)
+    .createWriter();
+
+// When writing:
+// - count1 and sum1 will be aggregated with existing values
+// - count2 and sum2 will remain unchanged
+```
+
+**Use cases for partial aggregation**:
+1. **Independent metrics**: When different processes update different subsets 
of metrics for the same key
+2. **Reduced data transfer**: Only send the columns that need to be updated
+3. **Flexible pipelines**: Different data sources can contribute to different 
aggregated fields
+
+### Delete Behavior
+
+The aggregation merge engine provides limited support for delete operations. 
You can configure the behavior using the `'table.agg.remove-record-on-delete'` 
option:
+
+```java
+TableDescriptor.builder()
+    .schema(schema)
+    .property("table.merge-engine", "aggregation")
+    .property("table.agg.remove-record-on-delete", "true")  // Default is false
+    .build();
+```
+
+**Configuration options**:
+- **`'table.agg.remove-record-on-delete' = 'false'`** (default): Delete 
operations will cause an error
+- **`'table.agg.remove-record-on-delete' = 'true'`**: Delete operations will 
remove the entire record from the table
+
+:::note
+**Current Limitation**: The aggregation merge engine does not support 
retraction semantics (e.g., subtracting from a sum, reverting a max). Delete 
operations can only remove the entire record or be rejected.
+
+Future versions may support fine-grained retraction by enhancing the protocol 
to carry row data with delete operations.
+:::
+
+## Performance Considerations
+
+1. **Choose Appropriate Aggregate Functions**: Select functions that match 
your use case to avoid unnecessary computations
+2. **Primary Key Design**: Use appropriate primary keys to ensure proper 
grouping of aggregated data
+3. **Null Handling**: Be aware of how each function handles null values to 
avoid unexpected results
+4. **Delete Handling**: If you need to handle delete operations, be aware that 
enabling `'table.agg.remove-record-on-delete' = 'true'` will remove entire 
records rather than retracting aggregated values
+
+## Limitations
+
+:::warning Critical Limitations
+When using the `aggregation` merge engine, be aware of the following critical 
limitations:
+
+### 1. Exactly-Once Semantics
+
+**Fluss engine does not natively support transactional writes, and therefore 
does not directly support Exactly-Once semantics at the storage layer.**
+
+Exactly-Once semantics should be achieved through integration with compute 
engines (e.g., Flink, Spark). For example, after failover, undo operations can 
be generated for invalid writes to achieve rollback.
+
+For detailed information about Exactly-Once implementation, please refer to: 
[FIP-21: Aggregation Merge 
Engine](https://cwiki.apache.org/confluence/display/FLUSS/FIP-21%3A+Aggregation+Merge+Engine)
+
+### 2. Delete Operations
+
+By default, delete operations will cause errors:
+- You must set `'table.agg.remove-record-on-delete' = 'true'` if you need to 
handle delete operations
+- This configuration will remove the entire aggregated record, not reverse 
individual aggregations
+
+### 3. Data Type Restrictions
+
+Each aggregate function supports specific data types (see function 
documentation above)
+:::

Review Comment:
   I don’t consider this a true limitation—after all, aggregate functions in 
compute engines (like Flink or Spark) also don’t support all data types 
universally. I suggest we **remove this item** from the list of limitations.
   
   Separately, we should **explicitly document the limitation around retraction 
messages** (i.e., handling of `UPDATE_BEFORE` messages). It’s important to 
clarify which aggregate functions support retraction and which do not, as this 
directly impacts correctness in streaming workloads.
   
   I think we can also add the **Retraction Handling** to each aggregate 
function documentation. And have a brief description in the "Limitations" 
section.
   
   For reference, Paimon provides a clear treatment of this topic here:  
   
   
https://paimon.apache.org/docs/master/primary-key-table/merge-engine/aggregation/#retraction
   
   



##########
website/docs/table-design/table-types/pk-table/merge-engines/aggregation.md:
##########
@@ -0,0 +1,534 @@
+---
+sidebar_label: Aggregation
+title: Aggregation Merge Engine
+sidebar_position: 4
+---
+
+# Aggregation Merge Engine
+
+## Overview
+
+The **Aggregation Merge Engine** is designed for scenarios where users only 
care about aggregated results rather than individual records. It aggregates 
each value field with the latest data one by one under the same primary key 
according to the specified aggregate function.
+
+Each field not part of the primary keys can be assigned an aggregate function 
using the Schema API (recommended) or connector options 
(`'fields.<field-name>.agg'`). If no function is specified for a field, it will 
use `last_value_ignore_nulls` aggregation as the default behavior.
+
+This merge engine is useful for real-time aggregation scenarios such as:
+- Computing running totals and statistics
+- Maintaining counters and metrics
+- Tracking maximum/minimum values over time
+- Building real-time dashboards and analytics
+
+## Configuration
+
+To enable the aggregation merge engine, set the following table property:
+
+```
+'table.merge-engine' = 'aggregation'
+```
+
+Then specify the aggregate function for each non-primary key field using 
connector options:
+
+```
+'fields.<field-name>.agg' = '<function-name>'
+```
+
+**Note**: The recommended way is to use Schema API (see section "API Usage" 
below). The connector option is provided as an alternative for 
connector-specific scenarios.
+
+## API Usage
+
+### Creating a Table with Aggregation
+
+```java
+import org.apache.fluss.client.Connection;
+import org.apache.fluss.client.admin.Admin;
+import org.apache.fluss.client.table.Table;
+import org.apache.fluss.metadata.Schema;
+import org.apache.fluss.metadata.TableDescriptor;
+import org.apache.fluss.metadata.TablePath;
+import org.apache.fluss.types.DataTypes;
+
+// Create connection
+Connection conn = Connection.create(config);
+Admin admin = conn.getAdmin();
+
+// Define schema with aggregation functions (recommended way)
+import org.apache.fluss.metadata.AggFunction;
+
+Schema schema = Schema.newBuilder()
+    .column("product_id", DataTypes.BIGINT())
+    .column("price", DataTypes.DOUBLE(), AggFunction.MAX)
+    .column("sales", DataTypes.BIGINT(), AggFunction.SUM)
+    .column("last_update_time", DataTypes.TIMESTAMP(3))  // Defaults to 
LAST_VALUE_IGNORE_NULLS
+    .primaryKey("product_id")
+    .build();
+
+// Create table with aggregation merge engine
+TableDescriptor tableDescriptor = TableDescriptor.builder()
+    .schema(schema)
+    .property("table.merge-engine", "aggregation")
+    .build();
+
+TablePath tablePath = TablePath.of("my_database", "product_stats");
+admin.createTable(tablePath, tableDescriptor, false).get();
+```
+
+### Writing Data
+
+```java
+// Get table
+Table table = conn.getTable(tablePath);
+
+// Create upsert writer
+UpsertWriter writer = table.newUpsert().createWriter();
+
+// Write data - these will be aggregated
+writer.upsert(row(1L, 23.0, 15L, timestamp1));
+writer.upsert(row(1L, 30.2, 20L, timestamp2)); // Same primary key - triggers 
aggregation
+
+writer.flush();
+```
+
+**Result after aggregation:**
+- `product_id`: 1
+- `price`: 30.2 (max of 23.0 and 30.2)
+- `sales`: 35 (sum of 15 and 20)
+- `last_update_time`: timestamp2 (last non-null value)
+
+## Supported Aggregate Functions
+
+Fluss currently supports the following aggregate functions:
+
+### sum
+
+Aggregates values by computing the sum across multiple rows.
+
+- **Supported Data Types**: TINYINT, SMALLINT, INT, BIGINT, FLOAT, DOUBLE, 
DECIMAL
+- **Behavior**: Adds incoming values to the accumulator
+- **Null Handling**: Null values are ignored
+
+**Example:**
+```java
+Schema schema = Schema.newBuilder()
+    .column("id", DataTypes.BIGINT())
+    .column("amount", DataTypes.DECIMAL(10, 2), AggFunction.SUM)
+    .primaryKey("id")
+    .build();
+
+TableDescriptor.builder()
+    .schema(schema)
+    .property("table.merge-engine", "aggregation")
+    .build();
+
+// Input: (1, 100.50), (1, 200.75)
+// Result: (1, 301.25)
+```
+
+### product
+
+Computes the product of values across multiple rows.
+
+- **Supported Data Types**: TINYINT, SMALLINT, INT, BIGINT, FLOAT, DOUBLE, 
DECIMAL
+- **Behavior**: Multiplies incoming values with the accumulator
+- **Null Handling**: Null values are ignored
+
+**Example:**
+```java
+Schema schema = Schema.newBuilder()
+    .column("id", DataTypes.BIGINT())
+    .column("discount_factor", DataTypes.DOUBLE(), AggFunction.PRODUCT)
+    .primaryKey("id")
+    .build();
+
+TableDescriptor.builder()
+    .schema(schema)
+    .property("table.merge-engine", "aggregation")
+    .build();
+
+// Input: (1, 0.9), (1, 0.8)
+// Result: (1, 0.72) -- 90% * 80% = 72%
+```
+
+### max
+
+Identifies and retains the maximum value.
+
+- **Supported Data Types**: CHAR, STRING, TINYINT, SMALLINT, INT, BIGINT, 
FLOAT, DOUBLE, DECIMAL, DATE, TIME, TIMESTAMP, TIMESTAMP_LTZ
+- **Behavior**: Keeps the larger value between accumulator and incoming value
+- **Null Handling**: Null values are ignored
+
+**Example:**
+```java
+Schema schema = Schema.newBuilder()
+    .column("id", DataTypes.BIGINT())
+    .column("max_temperature", DataTypes.DOUBLE(), AggFunction.MAX)
+    .column("max_reading_time", DataTypes.TIMESTAMP(3), AggFunction.MAX)
+    .primaryKey("id")
+    .build();
+
+TableDescriptor.builder()
+    .schema(schema)
+    .property("table.merge-engine", "aggregation")
+    .build();
+
+// Input: (1, 25.5, '2024-01-01 10:00:00'), (1, 28.3, '2024-01-01 11:00:00')
+// Result: (1, 28.3, '2024-01-01 11:00:00')
+```
+
+### min
+
+Identifies and retains the minimum value.
+
+- **Supported Data Types**: CHAR, STRING, TINYINT, SMALLINT, INT, BIGINT, 
FLOAT, DOUBLE, DECIMAL, DATE, TIME, TIMESTAMP, TIMESTAMP_LTZ
+- **Behavior**: Keeps the smaller value between accumulator and incoming value
+- **Null Handling**: Null values are ignored
+
+**Example:**
+```java
+Schema schema = Schema.newBuilder()
+    .column("id", DataTypes.BIGINT())
+    .column("lowest_price", DataTypes.DECIMAL(10, 2), AggFunction.MIN)
+    .primaryKey("id")
+    .build();
+
+TableDescriptor.builder()
+    .schema(schema)
+    .property("table.merge-engine", "aggregation")
+    .build();
+
+// Input: (1, 99.99), (1, 79.99), (1, 89.99)
+// Result: (1, 79.99)
+```
+
+### last_value
+
+Replaces the previous value with the most recently received value.
+
+- **Supported Data Types**: All data types
+- **Behavior**: Always uses the latest incoming value
+- **Null Handling**: Null values will overwrite previous values
+
+**Example:**
+```java
+Schema schema = Schema.newBuilder()
+    .column("id", DataTypes.BIGINT())
+    .column("status", DataTypes.STRING(), AggFunction.LAST_VALUE)
+    .column("last_login", DataTypes.TIMESTAMP(3), AggFunction.LAST_VALUE)
+    .primaryKey("id")
+    .build();
+
+TableDescriptor.builder()
+    .schema(schema)
+    .property("table.merge-engine", "aggregation")
+    .build();
+
+// Input: (1, 'online', '2024-01-01 10:00:00'), (1, 'offline', '2024-01-01 
11:00:00')
+// Result: (1, 'offline', '2024-01-01 11:00:00')

Review Comment:
   it's better to insert a `null` for one of the `last_value` column to 
demostrate the null handling. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to