wuchong commented on code in PR #2255:
URL: https://github.com/apache/fluss/pull/2255#discussion_r2649712131
##########
website/docs/table-design/table-types/pk-table/merge-engines/aggregation.md:
##########
@@ -0,0 +1,534 @@
+---
+sidebar_label: Aggregation
+title: Aggregation Merge Engine
+sidebar_position: 4
+---
+
+# Aggregation Merge Engine
+
+## Overview
+
+The **Aggregation Merge Engine** is designed for scenarios where users only
care about aggregated results rather than individual records. It aggregates
each value field with the latest data one by one under the same primary key
according to the specified aggregate function.
+
+Each field not part of the primary keys can be assigned an aggregate function
using the Schema API (recommended) or connector options
(`'fields.<field-name>.agg'`). If no function is specified for a field, it will
use `last_value_ignore_nulls` aggregation as the default behavior.
+
+This merge engine is useful for real-time aggregation scenarios such as:
+- Computing running totals and statistics
+- Maintaining counters and metrics
+- Tracking maximum/minimum values over time
+- Building real-time dashboards and analytics
+
+## Configuration
+
+To enable the aggregation merge engine, set the following table property:
+
+```
+'table.merge-engine' = 'aggregation'
+```
+
+Then specify the aggregate function for each non-primary key field using
connector options:
+
+```
+'fields.<field-name>.agg' = '<function-name>'
+```
+
+**Note**: The recommended way is to use Schema API (see section "API Usage"
below). The connector option is provided as an alternative for
connector-specific scenarios.
Review Comment:
I think it's hard to tell which way is recommended, as it depends on the
client users are working with:
- For **Java clients**, the `Schema` API is natural.
- For **compute engines** like Flink or Spark, DDL and connector options are
more typical.
Given that most Fluss users interact with it through engines like **Flink**
or **Spark**, I suggest **prioritizing Flink SQL examples** in the
documentation.
We can use **Tabs** to show both Flink SQL and Java Client examples side by
side, with **Flink SQL as the default tab**.
You can refer to `datastream.mdx` for an example of how to implement tabs in
our Markdown documentation.
Besides, we can use `INSERT INTO ... VALUES ...` and `SELECT * FROM ..` for
Flink SQL examples to show the input and result data which is easier to demo
for users.
##########
website/docs/table-design/table-types/pk-table/merge-engines/aggregation.md:
##########
@@ -0,0 +1,534 @@
+---
+sidebar_label: Aggregation
+title: Aggregation Merge Engine
+sidebar_position: 4
+---
+
+# Aggregation Merge Engine
+
+## Overview
+
+The **Aggregation Merge Engine** is designed for scenarios where users only
care about aggregated results rather than individual records. It aggregates
each value field with the latest data one by one under the same primary key
according to the specified aggregate function.
+
+Each field not part of the primary keys can be assigned an aggregate function
using the Schema API (recommended) or connector options
(`'fields.<field-name>.agg'`). If no function is specified for a field, it will
use `last_value_ignore_nulls` aggregation as the default behavior.
+
+This merge engine is useful for real-time aggregation scenarios such as:
+- Computing running totals and statistics
+- Maintaining counters and metrics
+- Tracking maximum/minimum values over time
+- Building real-time dashboards and analytics
+
+## Configuration
+
+To enable the aggregation merge engine, set the following table property:
+
+```
+'table.merge-engine' = 'aggregation'
+```
+
+Then specify the aggregate function for each non-primary key field using
connector options:
+
+```
+'fields.<field-name>.agg' = '<function-name>'
+```
+
+**Note**: The recommended way is to use Schema API (see section "API Usage"
below). The connector option is provided as an alternative for
connector-specific scenarios.
+
+## API Usage
+
+### Creating a Table with Aggregation
+
+```java
+import org.apache.fluss.client.Connection;
+import org.apache.fluss.client.admin.Admin;
+import org.apache.fluss.client.table.Table;
+import org.apache.fluss.metadata.Schema;
+import org.apache.fluss.metadata.TableDescriptor;
+import org.apache.fluss.metadata.TablePath;
+import org.apache.fluss.types.DataTypes;
+
+// Create connection
+Connection conn = Connection.create(config);
+Admin admin = conn.getAdmin();
+
+// Define schema with aggregation functions (recommended way)
+import org.apache.fluss.metadata.AggFunction;
+
+Schema schema = Schema.newBuilder()
+ .column("product_id", DataTypes.BIGINT())
+ .column("price", DataTypes.DOUBLE(), AggFunction.MAX)
+ .column("sales", DataTypes.BIGINT(), AggFunction.SUM)
+ .column("last_update_time", DataTypes.TIMESTAMP(3)) // Defaults to
LAST_VALUE_IGNORE_NULLS
+ .primaryKey("product_id")
+ .build();
+
+// Create table with aggregation merge engine
+TableDescriptor tableDescriptor = TableDescriptor.builder()
+ .schema(schema)
+ .property("table.merge-engine", "aggregation")
+ .build();
+
+TablePath tablePath = TablePath.of("my_database", "product_stats");
+admin.createTable(tablePath, tableDescriptor, false).get();
+```
+
+### Writing Data
+
+```java
+// Get table
+Table table = conn.getTable(tablePath);
+
+// Create upsert writer
+UpsertWriter writer = table.newUpsert().createWriter();
+
+// Write data - these will be aggregated
+writer.upsert(row(1L, 23.0, 15L, timestamp1));
+writer.upsert(row(1L, 30.2, 20L, timestamp2)); // Same primary key - triggers
aggregation
+
+writer.flush();
+```
+
+**Result after aggregation:**
+- `product_id`: 1
+- `price`: 30.2 (max of 23.0 and 30.2)
+- `sales`: 35 (sum of 15 and 20)
+- `last_update_time`: timestamp2 (last non-null value)
+
+## Supported Aggregate Functions
+
+Fluss currently supports the following aggregate functions:
+
+### sum
+
+Aggregates values by computing the sum across multiple rows.
+
+- **Supported Data Types**: TINYINT, SMALLINT, INT, BIGINT, FLOAT, DOUBLE,
DECIMAL
+- **Behavior**: Adds incoming values to the accumulator
+- **Null Handling**: Null values are ignored
+
+**Example:**
+```java
+Schema schema = Schema.newBuilder()
+ .column("id", DataTypes.BIGINT())
+ .column("amount", DataTypes.DECIMAL(10, 2), AggFunction.SUM)
+ .primaryKey("id")
+ .build();
+
+TableDescriptor.builder()
+ .schema(schema)
+ .property("table.merge-engine", "aggregation")
+ .build();
+
+// Input: (1, 100.50), (1, 200.75)
+// Result: (1, 301.25)
+```
+
+### product
+
+Computes the product of values across multiple rows.
+
+- **Supported Data Types**: TINYINT, SMALLINT, INT, BIGINT, FLOAT, DOUBLE,
DECIMAL
+- **Behavior**: Multiplies incoming values with the accumulator
+- **Null Handling**: Null values are ignored
+
+**Example:**
+```java
+Schema schema = Schema.newBuilder()
+ .column("id", DataTypes.BIGINT())
+ .column("discount_factor", DataTypes.DOUBLE(), AggFunction.PRODUCT)
+ .primaryKey("id")
+ .build();
+
+TableDescriptor.builder()
+ .schema(schema)
+ .property("table.merge-engine", "aggregation")
+ .build();
+
+// Input: (1, 0.9), (1, 0.8)
+// Result: (1, 0.72) -- 90% * 80% = 72%
+```
+
+### max
+
+Identifies and retains the maximum value.
+
+- **Supported Data Types**: CHAR, STRING, TINYINT, SMALLINT, INT, BIGINT,
FLOAT, DOUBLE, DECIMAL, DATE, TIME, TIMESTAMP, TIMESTAMP_LTZ
+- **Behavior**: Keeps the larger value between accumulator and incoming value
+- **Null Handling**: Null values are ignored
+
+**Example:**
+```java
+Schema schema = Schema.newBuilder()
+ .column("id", DataTypes.BIGINT())
+ .column("max_temperature", DataTypes.DOUBLE(), AggFunction.MAX)
+ .column("max_reading_time", DataTypes.TIMESTAMP(3), AggFunction.MAX)
+ .primaryKey("id")
+ .build();
+
+TableDescriptor.builder()
+ .schema(schema)
+ .property("table.merge-engine", "aggregation")
+ .build();
+
+// Input: (1, 25.5, '2024-01-01 10:00:00'), (1, 28.3, '2024-01-01 11:00:00')
+// Result: (1, 28.3, '2024-01-01 11:00:00')
+```
+
+### min
+
+Identifies and retains the minimum value.
+
+- **Supported Data Types**: CHAR, STRING, TINYINT, SMALLINT, INT, BIGINT,
FLOAT, DOUBLE, DECIMAL, DATE, TIME, TIMESTAMP, TIMESTAMP_LTZ
+- **Behavior**: Keeps the smaller value between accumulator and incoming value
+- **Null Handling**: Null values are ignored
+
+**Example:**
+```java
+Schema schema = Schema.newBuilder()
+ .column("id", DataTypes.BIGINT())
+ .column("lowest_price", DataTypes.DECIMAL(10, 2), AggFunction.MIN)
+ .primaryKey("id")
+ .build();
+
+TableDescriptor.builder()
+ .schema(schema)
+ .property("table.merge-engine", "aggregation")
+ .build();
+
+// Input: (1, 99.99), (1, 79.99), (1, 89.99)
+// Result: (1, 79.99)
+```
+
+### last_value
+
+Replaces the previous value with the most recently received value.
+
+- **Supported Data Types**: All data types
+- **Behavior**: Always uses the latest incoming value
+- **Null Handling**: Null values will overwrite previous values
+
+**Example:**
+```java
+Schema schema = Schema.newBuilder()
+ .column("id", DataTypes.BIGINT())
+ .column("status", DataTypes.STRING(), AggFunction.LAST_VALUE)
+ .column("last_login", DataTypes.TIMESTAMP(3), AggFunction.LAST_VALUE)
+ .primaryKey("id")
+ .build();
+
+TableDescriptor.builder()
+ .schema(schema)
+ .property("table.merge-engine", "aggregation")
+ .build();
+
+// Input: (1, 'online', '2024-01-01 10:00:00'), (1, 'offline', '2024-01-01
11:00:00')
+// Result: (1, 'offline', '2024-01-01 11:00:00')
+```
+
+### last_value_ignore_nulls
+
+Replaces the previous value with the latest non-null value. This is the
**default aggregate function** when no function is specified.
+
+- **Supported Data Types**: All data types
+- **Behavior**: Uses the latest incoming value only if it's not null
+- **Null Handling**: Null values are ignored, previous value is retained
+
+**Example:**
+```java
+Schema schema = Schema.newBuilder()
+ .column("id", DataTypes.BIGINT())
+ .column("email", DataTypes.STRING(), AggFunction.LAST_VALUE_IGNORE_NULLS)
+ .column("phone", DataTypes.STRING(), AggFunction.LAST_VALUE_IGNORE_NULLS)
+ .primaryKey("id")
+ .build();
+
+TableDescriptor.builder()
+ .schema(schema)
+ .property("table.merge-engine", "aggregation")
+ .build();
+
+// Input: (1, '[email protected]', '123-456'), (1, null, '789-012')
+// Result: (1, '[email protected]', '789-012')
+// Email remains '[email protected]' because the second upsert had null email
+```
+
+### first_value
+
+Retrieves and retains the first value seen for a field.
+
+- **Supported Data Types**: All data types
+- **Behavior**: Keeps the first received value, ignores all subsequent values
+- **Null Handling**: Null values are retained if received first
+
+**Example:**
+```java
+Schema schema = Schema.newBuilder()
+ .column("id", DataTypes.BIGINT())
+ .column("first_purchase_date", DataTypes.DATE(), AggFunction.FIRST_VALUE)
+ .column("first_product", DataTypes.STRING(), AggFunction.FIRST_VALUE)
+ .primaryKey("id")
+ .build();
+
+TableDescriptor.builder()
+ .schema(schema)
+ .property("table.merge-engine", "aggregation")
+ .build();
+
+// Input: (1, '2024-01-01', 'ProductA'), (1, '2024-02-01', 'ProductB')
+// Result: (1, '2024-01-01', 'ProductA')
+```
+
+### first_value_ignore_nulls
+
+Selects the first non-null value in a data set.
+
+- **Supported Data Types**: All data types
+- **Behavior**: Keeps the first received non-null value, ignores all
subsequent values
+- **Null Handling**: Null values are ignored until a non-null value is received
+
+**Example:**
+```java
+Schema schema = Schema.newBuilder()
+ .column("id", DataTypes.BIGINT())
+ .column("email", DataTypes.STRING(), AggFunction.FIRST_VALUE_IGNORE_NULLS)
+ .column("verified_at", DataTypes.TIMESTAMP(3),
AggFunction.FIRST_VALUE_IGNORE_NULLS)
+ .primaryKey("id")
+ .build();
+
+TableDescriptor.builder()
+ .schema(schema)
+ .property("table.merge-engine", "aggregation")
+ .build();
+
+// Input: (1, null, null), (1, '[email protected]', '2024-01-01 10:00:00'), (1,
'[email protected]', '2024-01-02 10:00:00')
+// Result: (1, '[email protected]', '2024-01-01 10:00:00')
+```
+
+### listagg
+
+Concatenates multiple string values into a single string with a delimiter.
+
+- **Supported Data Types**: STRING, CHAR
+- **Behavior**: Concatenates values using the specified delimiter
+- **Null Handling**: Null values are skipped
+- **Configuration**: Use
`'table.merge-engine.aggregate.<field-name>.listagg-delimiter'` to specify a
custom delimiter (default is comma `,`)
+
+**Example:**
+```java
+Schema schema = Schema.newBuilder()
+ .column("id", DataTypes.BIGINT())
+ .column("tags", DataTypes.STRING(), AggFunction.LISTAGG)
+ .primaryKey("id")
+ .build();
+
+TableDescriptor.builder()
+ .schema(schema)
+ .property("table.merge-engine", "aggregation")
+ .property("table.merge-engine.aggregate.tags.listagg-delimiter", ";")
+ .build();
+
+// Input: (1, 'developer'), (1, 'java'), (1, 'flink')
+// Result: (1, 'developer;java;flink')
+```
+
+### string_agg
+
+Alias for `listagg`. Concatenates multiple string values into a single string
with a delimiter.
+
+- **Supported Data Types**: STRING, CHAR
+- **Behavior**: Same as `listagg` - concatenates values using the specified
delimiter
+- **Null Handling**: Null values are skipped
+- **Configuration**: Use
`'table.merge-engine.aggregate.<field-name>.listagg-delimiter'` to specify a
custom delimiter (default is comma `,`)
+
+**Example:**
+```java
+Schema schema = Schema.newBuilder()
+ .column("id", DataTypes.BIGINT())
+ .column("tags", DataTypes.STRING(), AggFunction.STRING_AGG)
+ .primaryKey("id")
+ .build();
+
+TableDescriptor.builder()
+ .schema(schema)
+ .property("table.merge-engine", "aggregation")
+ .property("table.merge-engine.aggregate.tags.listagg-delimiter", ";")
+ .build();
+
+// Input: (1, 'developer'), (1, 'java'), (1, 'flink')
+// Result: (1, 'developer;java;flink')
+```
+
+### bool_and
+
+Evaluates whether all boolean values in a set are true (logical AND).
+
+- **Supported Data Types**: BOOLEAN
+- **Behavior**: Returns true only if all values are true
+- **Null Handling**: Null values are ignored
+
+**Example:**
+```java
+Schema schema = Schema.newBuilder()
+ .column("id", DataTypes.BIGINT())
+ .column("has_all_permissions", DataTypes.BOOLEAN(), AggFunction.BOOL_AND)
+ .primaryKey("id")
+ .build();
+
+TableDescriptor.builder()
+ .schema(schema)
+ .property("table.merge-engine", "aggregation")
+ .build();
+
+// Input: (1, true), (1, true), (1, false)
+// Result: (1, false) -- Not all values are true
+```
+
+### bool_or
+
+Checks if at least one boolean value in a set is true (logical OR).
+
+- **Supported Data Types**: BOOLEAN
+- **Behavior**: Returns true if any value is true
+- **Null Handling**: Null values are ignored
+
+**Example:**
+```java
+Schema schema = Schema.newBuilder()
+ .column("id", DataTypes.BIGINT())
+ .column("has_any_alert", DataTypes.BOOLEAN(), AggFunction.BOOL_OR)
+ .primaryKey("id")
+ .build();
+
+TableDescriptor.builder()
+ .schema(schema)
+ .property("table.merge-engine", "aggregation")
+ .build();
+
+// Input: (1, false), (1, false), (1, true)
+// Result: (1, true) -- At least one value is true
+```
+
+## Advanced Configuration
+
+### Default Aggregate Function
+
+You can set a default aggregate function for all non-primary key fields that
don't have an explicitly specified function:
+
+```java
+Schema schema = Schema.newBuilder()
+ .column("id", DataTypes.BIGINT())
+ .column("col1", DataTypes.STRING()) // Defaults to LAST_VALUE_IGNORE_NULLS
+ .column("col2", DataTypes.BIGINT(), AggFunction.SUM) // Explicitly set to
SUM
+ .primaryKey("id")
+ .build();
+
+TableDescriptor.builder()
+ .schema(schema)
+ .property("table.merge-engine", "aggregation")
+ .build();
+```
+
+In this example:
+- `col2` uses `sum` aggregation (explicitly specified)
+- `col1` uses `last_value_ignore_nulls` as the default
+
+### Partial Update with Aggregation
+
+The aggregation merge engine supports partial updates through the UpsertWriter
API. When performing a partial update:
+
+- **Target columns**: These columns will be aggregated according to their
configured aggregate functions
+- **Non-target columns**: These columns will retain their existing values from
the old row
+
+**Example:**
+
+```java
+Schema schema = Schema.newBuilder()
+ .column("id", DataTypes.BIGINT())
+ .column("count1", DataTypes.BIGINT(), AggFunction.SUM)
+ .column("count2", DataTypes.BIGINT(), AggFunction.SUM)
+ .column("sum1", DataTypes.DOUBLE(), AggFunction.SUM)
+ .column("sum2", DataTypes.DOUBLE(), AggFunction.SUM)
+ .primaryKey("id")
+ .build();
+
+TableDescriptor tableDescriptor = TableDescriptor.builder()
+ .schema(schema)
+ .property("table.merge-engine", "aggregation")
+ .build();
+
+// Create partial update writer targeting only id, count1, and sum1
+int[] targetColumns = new int[]{0, 1, 3}; // id, count1, sum1
+UpsertWriter partialWriter = table.newUpsert()
+ .withPartialUpdate(targetColumns)
Review Comment:
Use name-based `.partialUpdate("id", "count1", "sum1")` which is more
straightforward.
##########
website/docs/table-design/table-types/pk-table/merge-engines/aggregation.md:
##########
@@ -0,0 +1,534 @@
+---
+sidebar_label: Aggregation
+title: Aggregation Merge Engine
+sidebar_position: 4
Review Comment:
```suggestion
sidebar_position: 5
```
##########
website/docs/table-design/table-types/pk-table/merge-engines/aggregation.md:
##########
@@ -0,0 +1,534 @@
+---
+sidebar_label: Aggregation
+title: Aggregation Merge Engine
+sidebar_position: 4
+---
+
+# Aggregation Merge Engine
+
+## Overview
+
+The **Aggregation Merge Engine** is designed for scenarios where users only
care about aggregated results rather than individual records. It aggregates
each value field with the latest data one by one under the same primary key
according to the specified aggregate function.
+
+Each field not part of the primary keys can be assigned an aggregate function
using the Schema API (recommended) or connector options
(`'fields.<field-name>.agg'`). If no function is specified for a field, it will
use `last_value_ignore_nulls` aggregation as the default behavior.
+
+This merge engine is useful for real-time aggregation scenarios such as:
+- Computing running totals and statistics
+- Maintaining counters and metrics
+- Tracking maximum/minimum values over time
+- Building real-time dashboards and analytics
+
+## Configuration
+
+To enable the aggregation merge engine, set the following table property:
+
+```
+'table.merge-engine' = 'aggregation'
+```
+
+Then specify the aggregate function for each non-primary key field using
connector options:
+
+```
+'fields.<field-name>.agg' = '<function-name>'
+```
+
+**Note**: The recommended way is to use Schema API (see section "API Usage"
below). The connector option is provided as an alternative for
connector-specific scenarios.
+
+## API Usage
+
+### Creating a Table with Aggregation
+
+```java
+import org.apache.fluss.client.Connection;
+import org.apache.fluss.client.admin.Admin;
+import org.apache.fluss.client.table.Table;
+import org.apache.fluss.metadata.Schema;
+import org.apache.fluss.metadata.TableDescriptor;
+import org.apache.fluss.metadata.TablePath;
+import org.apache.fluss.types.DataTypes;
+
+// Create connection
+Connection conn = Connection.create(config);
+Admin admin = conn.getAdmin();
+
+// Define schema with aggregation functions (recommended way)
+import org.apache.fluss.metadata.AggFunction;
+
+Schema schema = Schema.newBuilder()
+ .column("product_id", DataTypes.BIGINT())
+ .column("price", DataTypes.DOUBLE(), AggFunction.MAX)
+ .column("sales", DataTypes.BIGINT(), AggFunction.SUM)
+ .column("last_update_time", DataTypes.TIMESTAMP(3)) // Defaults to
LAST_VALUE_IGNORE_NULLS
+ .primaryKey("product_id")
+ .build();
+
+// Create table with aggregation merge engine
+TableDescriptor tableDescriptor = TableDescriptor.builder()
+ .schema(schema)
+ .property("table.merge-engine", "aggregation")
+ .build();
+
+TablePath tablePath = TablePath.of("my_database", "product_stats");
+admin.createTable(tablePath, tableDescriptor, false).get();
+```
+
+### Writing Data
+
+```java
+// Get table
+Table table = conn.getTable(tablePath);
+
+// Create upsert writer
+UpsertWriter writer = table.newUpsert().createWriter();
+
+// Write data - these will be aggregated
+writer.upsert(row(1L, 23.0, 15L, timestamp1));
+writer.upsert(row(1L, 30.2, 20L, timestamp2)); // Same primary key - triggers
aggregation
+
+writer.flush();
+```
+
+**Result after aggregation:**
+- `product_id`: 1
+- `price`: 30.2 (max of 23.0 and 30.2)
+- `sales`: 35 (sum of 15 and 20)
+- `last_update_time`: timestamp2 (last non-null value)
+
+## Supported Aggregate Functions
+
+Fluss currently supports the following aggregate functions:
+
+### sum
+
+Aggregates values by computing the sum across multiple rows.
+
+- **Supported Data Types**: TINYINT, SMALLINT, INT, BIGINT, FLOAT, DOUBLE,
DECIMAL
+- **Behavior**: Adds incoming values to the accumulator
+- **Null Handling**: Null values are ignored
+
+**Example:**
+```java
+Schema schema = Schema.newBuilder()
+ .column("id", DataTypes.BIGINT())
+ .column("amount", DataTypes.DECIMAL(10, 2), AggFunction.SUM)
+ .primaryKey("id")
+ .build();
+
+TableDescriptor.builder()
+ .schema(schema)
+ .property("table.merge-engine", "aggregation")
+ .build();
+
+// Input: (1, 100.50), (1, 200.75)
+// Result: (1, 301.25)
+```
+
+### product
+
+Computes the product of values across multiple rows.
+
+- **Supported Data Types**: TINYINT, SMALLINT, INT, BIGINT, FLOAT, DOUBLE,
DECIMAL
+- **Behavior**: Multiplies incoming values with the accumulator
+- **Null Handling**: Null values are ignored
+
+**Example:**
+```java
+Schema schema = Schema.newBuilder()
+ .column("id", DataTypes.BIGINT())
+ .column("discount_factor", DataTypes.DOUBLE(), AggFunction.PRODUCT)
+ .primaryKey("id")
+ .build();
+
+TableDescriptor.builder()
+ .schema(schema)
+ .property("table.merge-engine", "aggregation")
+ .build();
+
+// Input: (1, 0.9), (1, 0.8)
+// Result: (1, 0.72) -- 90% * 80% = 72%
+```
+
+### max
+
+Identifies and retains the maximum value.
+
+- **Supported Data Types**: CHAR, STRING, TINYINT, SMALLINT, INT, BIGINT,
FLOAT, DOUBLE, DECIMAL, DATE, TIME, TIMESTAMP, TIMESTAMP_LTZ
+- **Behavior**: Keeps the larger value between accumulator and incoming value
+- **Null Handling**: Null values are ignored
+
+**Example:**
+```java
+Schema schema = Schema.newBuilder()
+ .column("id", DataTypes.BIGINT())
+ .column("max_temperature", DataTypes.DOUBLE(), AggFunction.MAX)
+ .column("max_reading_time", DataTypes.TIMESTAMP(3), AggFunction.MAX)
+ .primaryKey("id")
+ .build();
+
+TableDescriptor.builder()
+ .schema(schema)
+ .property("table.merge-engine", "aggregation")
+ .build();
+
+// Input: (1, 25.5, '2024-01-01 10:00:00'), (1, 28.3, '2024-01-01 11:00:00')
+// Result: (1, 28.3, '2024-01-01 11:00:00')
+```
+
+### min
+
+Identifies and retains the minimum value.
+
+- **Supported Data Types**: CHAR, STRING, TINYINT, SMALLINT, INT, BIGINT,
FLOAT, DOUBLE, DECIMAL, DATE, TIME, TIMESTAMP, TIMESTAMP_LTZ
+- **Behavior**: Keeps the smaller value between accumulator and incoming value
+- **Null Handling**: Null values are ignored
+
+**Example:**
+```java
+Schema schema = Schema.newBuilder()
+ .column("id", DataTypes.BIGINT())
+ .column("lowest_price", DataTypes.DECIMAL(10, 2), AggFunction.MIN)
+ .primaryKey("id")
+ .build();
+
+TableDescriptor.builder()
+ .schema(schema)
+ .property("table.merge-engine", "aggregation")
+ .build();
+
+// Input: (1, 99.99), (1, 79.99), (1, 89.99)
+// Result: (1, 79.99)
+```
+
+### last_value
+
+Replaces the previous value with the most recently received value.
+
+- **Supported Data Types**: All data types
+- **Behavior**: Always uses the latest incoming value
+- **Null Handling**: Null values will overwrite previous values
+
+**Example:**
+```java
+Schema schema = Schema.newBuilder()
+ .column("id", DataTypes.BIGINT())
+ .column("status", DataTypes.STRING(), AggFunction.LAST_VALUE)
+ .column("last_login", DataTypes.TIMESTAMP(3), AggFunction.LAST_VALUE)
+ .primaryKey("id")
+ .build();
+
+TableDescriptor.builder()
+ .schema(schema)
+ .property("table.merge-engine", "aggregation")
+ .build();
+
+// Input: (1, 'online', '2024-01-01 10:00:00'), (1, 'offline', '2024-01-01
11:00:00')
+// Result: (1, 'offline', '2024-01-01 11:00:00')
+```
+
+### last_value_ignore_nulls
+
+Replaces the previous value with the latest non-null value. This is the
**default aggregate function** when no function is specified.
+
+- **Supported Data Types**: All data types
+- **Behavior**: Uses the latest incoming value only if it's not null
+- **Null Handling**: Null values are ignored, previous value is retained
+
+**Example:**
+```java
+Schema schema = Schema.newBuilder()
+ .column("id", DataTypes.BIGINT())
+ .column("email", DataTypes.STRING(), AggFunction.LAST_VALUE_IGNORE_NULLS)
+ .column("phone", DataTypes.STRING(), AggFunction.LAST_VALUE_IGNORE_NULLS)
+ .primaryKey("id")
+ .build();
+
+TableDescriptor.builder()
+ .schema(schema)
+ .property("table.merge-engine", "aggregation")
+ .build();
+
+// Input: (1, '[email protected]', '123-456'), (1, null, '789-012')
+// Result: (1, '[email protected]', '789-012')
+// Email remains '[email protected]' because the second upsert had null email
+```
+
+### first_value
+
+Retrieves and retains the first value seen for a field.
+
+- **Supported Data Types**: All data types
+- **Behavior**: Keeps the first received value, ignores all subsequent values
+- **Null Handling**: Null values are retained if received first
+
+**Example:**
+```java
+Schema schema = Schema.newBuilder()
+ .column("id", DataTypes.BIGINT())
+ .column("first_purchase_date", DataTypes.DATE(), AggFunction.FIRST_VALUE)
+ .column("first_product", DataTypes.STRING(), AggFunction.FIRST_VALUE)
+ .primaryKey("id")
+ .build();
+
+TableDescriptor.builder()
+ .schema(schema)
+ .property("table.merge-engine", "aggregation")
+ .build();
+
+// Input: (1, '2024-01-01', 'ProductA'), (1, '2024-02-01', 'ProductB')
+// Result: (1, '2024-01-01', 'ProductA')
+```
+
+### first_value_ignore_nulls
+
+Selects the first non-null value in a data set.
+
+- **Supported Data Types**: All data types
+- **Behavior**: Keeps the first received non-null value, ignores all
subsequent values
+- **Null Handling**: Null values are ignored until a non-null value is received
+
+**Example:**
+```java
+Schema schema = Schema.newBuilder()
+ .column("id", DataTypes.BIGINT())
+ .column("email", DataTypes.STRING(), AggFunction.FIRST_VALUE_IGNORE_NULLS)
+ .column("verified_at", DataTypes.TIMESTAMP(3),
AggFunction.FIRST_VALUE_IGNORE_NULLS)
+ .primaryKey("id")
+ .build();
+
+TableDescriptor.builder()
+ .schema(schema)
+ .property("table.merge-engine", "aggregation")
+ .build();
+
+// Input: (1, null, null), (1, '[email protected]', '2024-01-01 10:00:00'), (1,
'[email protected]', '2024-01-02 10:00:00')
+// Result: (1, '[email protected]', '2024-01-01 10:00:00')
+```
+
+### listagg
+
+Concatenates multiple string values into a single string with a delimiter.
+
+- **Supported Data Types**: STRING, CHAR
+- **Behavior**: Concatenates values using the specified delimiter
+- **Null Handling**: Null values are skipped
+- **Configuration**: Use
`'table.merge-engine.aggregate.<field-name>.listagg-delimiter'` to specify a
custom delimiter (default is comma `,`)
+
+**Example:**
+```java
+Schema schema = Schema.newBuilder()
+ .column("id", DataTypes.BIGINT())
+ .column("tags", DataTypes.STRING(), AggFunction.LISTAGG)
+ .primaryKey("id")
+ .build();
+
+TableDescriptor.builder()
+ .schema(schema)
+ .property("table.merge-engine", "aggregation")
+ .property("table.merge-engine.aggregate.tags.listagg-delimiter", ";")
Review Comment:
This is an issue I didn’t anticipate when designing the Schema API. Since
`delimiter` is an **aggregate-specific parameter** (and we may introduce more
such parameters in the future), it logically belongs to the **definition of the
aggregate function itself**—not as a separate table property.
However, this introduces two concerns:
1. **It breaks our original design principle**: previously, aggregate
functions were defined purely in terms of columns. Now, the function definition
is split between the schema and table properties, reducing cohesion.
2. **Naming conflict**: the property name is overly long and clashes with
the existing Flink connector option `fields.xxx.listagg-delimiter`, which could
cause confusion or redundancy.
I see two viable paths forward:
**Option 1: Remove the concept of “aggregate column”**
- The schema would only describe *column names* and *types*.
- All aggregate function metadata (including `delimiter`) would live in
**table properties**.
- ✅ *Benefits*: cleaner separation, better alignment across APIs (Java,
Flink, Spark, Trino, etc.), and easier extensibility for future engines.
**Option 2: Introduce a richer aggregate function model (similar to
`DataTypes`)**
- Rename `enum AggFunction` → `AggFunctionType`.
- Introduce a class `AggFunction` that can carry parameters (e.g.,
`delimiter`).
- Add a utility class `AggFunctions` with methods like:
```java
AggFunctions.SUM()
AggFunctions.LISTAGG(";")
```
- Allow inline definition in the schema builder:
```java
.column("tags", DataTypes.STRING(), AggFunctions.LISTAGG(";"))
```
- The Flink connector options would still translate into these schema
builder calls.
- ✅ *Benefits*: expressive, type-safe, and keeps logic co-located with the
column definition.
I’m fine with either approach. Which approach do you prefer?
##########
website/docs/table-design/table-types/pk-table/merge-engines/aggregation.md:
##########
@@ -0,0 +1,534 @@
+---
+sidebar_label: Aggregation
+title: Aggregation Merge Engine
+sidebar_position: 4
+---
+
+# Aggregation Merge Engine
+
+## Overview
+
+The **Aggregation Merge Engine** is designed for scenarios where users only
care about aggregated results rather than individual records. It aggregates
each value field with the latest data one by one under the same primary key
according to the specified aggregate function.
+
+Each field not part of the primary keys can be assigned an aggregate function
using the Schema API (recommended) or connector options
(`'fields.<field-name>.agg'`). If no function is specified for a field, it will
use `last_value_ignore_nulls` aggregation as the default behavior.
+
+This merge engine is useful for real-time aggregation scenarios such as:
+- Computing running totals and statistics
+- Maintaining counters and metrics
+- Tracking maximum/minimum values over time
+- Building real-time dashboards and analytics
+
+## Configuration
+
+To enable the aggregation merge engine, set the following table property:
+
+```
+'table.merge-engine' = 'aggregation'
+```
+
+Then specify the aggregate function for each non-primary key field using
connector options:
+
+```
+'fields.<field-name>.agg' = '<function-name>'
+```
+
+**Note**: The recommended way is to use Schema API (see section "API Usage"
below). The connector option is provided as an alternative for
connector-specific scenarios.
+
+## API Usage
+
+### Creating a Table with Aggregation
+
+```java
+import org.apache.fluss.client.Connection;
+import org.apache.fluss.client.admin.Admin;
+import org.apache.fluss.client.table.Table;
+import org.apache.fluss.metadata.Schema;
+import org.apache.fluss.metadata.TableDescriptor;
+import org.apache.fluss.metadata.TablePath;
+import org.apache.fluss.types.DataTypes;
+
+// Create connection
+Connection conn = Connection.create(config);
+Admin admin = conn.getAdmin();
+
+// Define schema with aggregation functions (recommended way)
+import org.apache.fluss.metadata.AggFunction;
+
+Schema schema = Schema.newBuilder()
+ .column("product_id", DataTypes.BIGINT())
+ .column("price", DataTypes.DOUBLE(), AggFunction.MAX)
+ .column("sales", DataTypes.BIGINT(), AggFunction.SUM)
+ .column("last_update_time", DataTypes.TIMESTAMP(3)) // Defaults to
LAST_VALUE_IGNORE_NULLS
+ .primaryKey("product_id")
+ .build();
+
+// Create table with aggregation merge engine
+TableDescriptor tableDescriptor = TableDescriptor.builder()
+ .schema(schema)
+ .property("table.merge-engine", "aggregation")
+ .build();
+
+TablePath tablePath = TablePath.of("my_database", "product_stats");
+admin.createTable(tablePath, tableDescriptor, false).get();
+```
+
+### Writing Data
+
+```java
+// Get table
+Table table = conn.getTable(tablePath);
+
+// Create upsert writer
+UpsertWriter writer = table.newUpsert().createWriter();
+
+// Write data - these will be aggregated
+writer.upsert(row(1L, 23.0, 15L, timestamp1));
+writer.upsert(row(1L, 30.2, 20L, timestamp2)); // Same primary key - triggers
aggregation
+
+writer.flush();
+```
+
+**Result after aggregation:**
+- `product_id`: 1
+- `price`: 30.2 (max of 23.0 and 30.2)
+- `sales`: 35 (sum of 15 and 20)
+- `last_update_time`: timestamp2 (last non-null value)
+
+## Supported Aggregate Functions
+
+Fluss currently supports the following aggregate functions:
+
+### sum
+
+Aggregates values by computing the sum across multiple rows.
+
+- **Supported Data Types**: TINYINT, SMALLINT, INT, BIGINT, FLOAT, DOUBLE,
DECIMAL
+- **Behavior**: Adds incoming values to the accumulator
+- **Null Handling**: Null values are ignored
+
+**Example:**
+```java
+Schema schema = Schema.newBuilder()
+ .column("id", DataTypes.BIGINT())
+ .column("amount", DataTypes.DECIMAL(10, 2), AggFunction.SUM)
+ .primaryKey("id")
+ .build();
+
+TableDescriptor.builder()
+ .schema(schema)
+ .property("table.merge-engine", "aggregation")
+ .build();
+
+// Input: (1, 100.50), (1, 200.75)
+// Result: (1, 301.25)
+```
+
+### product
+
+Computes the product of values across multiple rows.
+
+- **Supported Data Types**: TINYINT, SMALLINT, INT, BIGINT, FLOAT, DOUBLE,
DECIMAL
+- **Behavior**: Multiplies incoming values with the accumulator
+- **Null Handling**: Null values are ignored
+
+**Example:**
+```java
+Schema schema = Schema.newBuilder()
+ .column("id", DataTypes.BIGINT())
+ .column("discount_factor", DataTypes.DOUBLE(), AggFunction.PRODUCT)
+ .primaryKey("id")
+ .build();
+
+TableDescriptor.builder()
+ .schema(schema)
+ .property("table.merge-engine", "aggregation")
+ .build();
+
+// Input: (1, 0.9), (1, 0.8)
+// Result: (1, 0.72) -- 90% * 80% = 72%
+```
+
+### max
+
+Identifies and retains the maximum value.
+
+- **Supported Data Types**: CHAR, STRING, TINYINT, SMALLINT, INT, BIGINT,
FLOAT, DOUBLE, DECIMAL, DATE, TIME, TIMESTAMP, TIMESTAMP_LTZ
+- **Behavior**: Keeps the larger value between accumulator and incoming value
+- **Null Handling**: Null values are ignored
+
+**Example:**
+```java
+Schema schema = Schema.newBuilder()
+ .column("id", DataTypes.BIGINT())
+ .column("max_temperature", DataTypes.DOUBLE(), AggFunction.MAX)
+ .column("max_reading_time", DataTypes.TIMESTAMP(3), AggFunction.MAX)
+ .primaryKey("id")
+ .build();
+
+TableDescriptor.builder()
+ .schema(schema)
+ .property("table.merge-engine", "aggregation")
+ .build();
+
+// Input: (1, 25.5, '2024-01-01 10:00:00'), (1, 28.3, '2024-01-01 11:00:00')
+// Result: (1, 28.3, '2024-01-01 11:00:00')
+```
+
+### min
+
+Identifies and retains the minimum value.
+
+- **Supported Data Types**: CHAR, STRING, TINYINT, SMALLINT, INT, BIGINT,
FLOAT, DOUBLE, DECIMAL, DATE, TIME, TIMESTAMP, TIMESTAMP_LTZ
+- **Behavior**: Keeps the smaller value between accumulator and incoming value
+- **Null Handling**: Null values are ignored
+
+**Example:**
+```java
+Schema schema = Schema.newBuilder()
+ .column("id", DataTypes.BIGINT())
+ .column("lowest_price", DataTypes.DECIMAL(10, 2), AggFunction.MIN)
+ .primaryKey("id")
+ .build();
+
+TableDescriptor.builder()
+ .schema(schema)
+ .property("table.merge-engine", "aggregation")
+ .build();
+
+// Input: (1, 99.99), (1, 79.99), (1, 89.99)
+// Result: (1, 79.99)
+```
+
+### last_value
+
+Replaces the previous value with the most recently received value.
+
+- **Supported Data Types**: All data types
+- **Behavior**: Always uses the latest incoming value
+- **Null Handling**: Null values will overwrite previous values
+
+**Example:**
+```java
+Schema schema = Schema.newBuilder()
+ .column("id", DataTypes.BIGINT())
+ .column("status", DataTypes.STRING(), AggFunction.LAST_VALUE)
+ .column("last_login", DataTypes.TIMESTAMP(3), AggFunction.LAST_VALUE)
+ .primaryKey("id")
+ .build();
+
+TableDescriptor.builder()
+ .schema(schema)
+ .property("table.merge-engine", "aggregation")
+ .build();
+
+// Input: (1, 'online', '2024-01-01 10:00:00'), (1, 'offline', '2024-01-01
11:00:00')
+// Result: (1, 'offline', '2024-01-01 11:00:00')
+```
+
+### last_value_ignore_nulls
+
+Replaces the previous value with the latest non-null value. This is the
**default aggregate function** when no function is specified.
+
+- **Supported Data Types**: All data types
+- **Behavior**: Uses the latest incoming value only if it's not null
+- **Null Handling**: Null values are ignored, previous value is retained
+
+**Example:**
+```java
+Schema schema = Schema.newBuilder()
+ .column("id", DataTypes.BIGINT())
+ .column("email", DataTypes.STRING(), AggFunction.LAST_VALUE_IGNORE_NULLS)
+ .column("phone", DataTypes.STRING(), AggFunction.LAST_VALUE_IGNORE_NULLS)
+ .primaryKey("id")
+ .build();
+
+TableDescriptor.builder()
+ .schema(schema)
+ .property("table.merge-engine", "aggregation")
+ .build();
+
+// Input: (1, '[email protected]', '123-456'), (1, null, '789-012')
+// Result: (1, '[email protected]', '789-012')
+// Email remains '[email protected]' because the second upsert had null email
+```
+
+### first_value
+
+Retrieves and retains the first value seen for a field.
+
+- **Supported Data Types**: All data types
+- **Behavior**: Keeps the first received value, ignores all subsequent values
+- **Null Handling**: Null values are retained if received first
+
+**Example:**
+```java
+Schema schema = Schema.newBuilder()
+ .column("id", DataTypes.BIGINT())
+ .column("first_purchase_date", DataTypes.DATE(), AggFunction.FIRST_VALUE)
+ .column("first_product", DataTypes.STRING(), AggFunction.FIRST_VALUE)
+ .primaryKey("id")
+ .build();
+
+TableDescriptor.builder()
+ .schema(schema)
+ .property("table.merge-engine", "aggregation")
+ .build();
+
+// Input: (1, '2024-01-01', 'ProductA'), (1, '2024-02-01', 'ProductB')
+// Result: (1, '2024-01-01', 'ProductA')
+```
+
+### first_value_ignore_nulls
+
+Selects the first non-null value in a data set.
+
+- **Supported Data Types**: All data types
+- **Behavior**: Keeps the first received non-null value, ignores all
subsequent values
+- **Null Handling**: Null values are ignored until a non-null value is received
+
+**Example:**
+```java
+Schema schema = Schema.newBuilder()
+ .column("id", DataTypes.BIGINT())
+ .column("email", DataTypes.STRING(), AggFunction.FIRST_VALUE_IGNORE_NULLS)
+ .column("verified_at", DataTypes.TIMESTAMP(3),
AggFunction.FIRST_VALUE_IGNORE_NULLS)
+ .primaryKey("id")
+ .build();
+
+TableDescriptor.builder()
+ .schema(schema)
+ .property("table.merge-engine", "aggregation")
+ .build();
+
+// Input: (1, null, null), (1, '[email protected]', '2024-01-01 10:00:00'), (1,
'[email protected]', '2024-01-02 10:00:00')
+// Result: (1, '[email protected]', '2024-01-01 10:00:00')
+```
+
+### listagg
+
+Concatenates multiple string values into a single string with a delimiter.
+
+- **Supported Data Types**: STRING, CHAR
+- **Behavior**: Concatenates values using the specified delimiter
+- **Null Handling**: Null values are skipped
+- **Configuration**: Use
`'table.merge-engine.aggregate.<field-name>.listagg-delimiter'` to specify a
custom delimiter (default is comma `,`)
+
+**Example:**
+```java
+Schema schema = Schema.newBuilder()
+ .column("id", DataTypes.BIGINT())
+ .column("tags", DataTypes.STRING(), AggFunction.LISTAGG)
+ .primaryKey("id")
+ .build();
+
+TableDescriptor.builder()
+ .schema(schema)
+ .property("table.merge-engine", "aggregation")
+ .property("table.merge-engine.aggregate.tags.listagg-delimiter", ";")
+ .build();
+
+// Input: (1, 'developer'), (1, 'java'), (1, 'flink')
+// Result: (1, 'developer;java;flink')
Review Comment:
It would be better to include an additional `LISTAGG` column that uses the
**default behavior** (e.g., comma as delimiter). Since most users will rely on
the defaults, demonstrating the expected output in this common case improves
usability and sets clear expectations.
##########
website/docs/table-design/table-types/pk-table/merge-engines/aggregation.md:
##########
@@ -0,0 +1,534 @@
+---
+sidebar_label: Aggregation
+title: Aggregation Merge Engine
+sidebar_position: 4
+---
+
+# Aggregation Merge Engine
+
+## Overview
+
+The **Aggregation Merge Engine** is designed for scenarios where users only
care about aggregated results rather than individual records. It aggregates
each value field with the latest data one by one under the same primary key
according to the specified aggregate function.
+
+Each field not part of the primary keys can be assigned an aggregate function
using the Schema API (recommended) or connector options
(`'fields.<field-name>.agg'`). If no function is specified for a field, it will
use `last_value_ignore_nulls` aggregation as the default behavior.
+
+This merge engine is useful for real-time aggregation scenarios such as:
+- Computing running totals and statistics
+- Maintaining counters and metrics
+- Tracking maximum/minimum values over time
+- Building real-time dashboards and analytics
+
+## Configuration
+
+To enable the aggregation merge engine, set the following table property:
+
+```
+'table.merge-engine' = 'aggregation'
+```
+
+Then specify the aggregate function for each non-primary key field using
connector options:
+
+```
+'fields.<field-name>.agg' = '<function-name>'
+```
+
+**Note**: The recommended way is to use Schema API (see section "API Usage"
below). The connector option is provided as an alternative for
connector-specific scenarios.
+
+## API Usage
+
+### Creating a Table with Aggregation
+
+```java
+import org.apache.fluss.client.Connection;
+import org.apache.fluss.client.admin.Admin;
+import org.apache.fluss.client.table.Table;
+import org.apache.fluss.metadata.Schema;
+import org.apache.fluss.metadata.TableDescriptor;
+import org.apache.fluss.metadata.TablePath;
+import org.apache.fluss.types.DataTypes;
+
+// Create connection
+Connection conn = Connection.create(config);
+Admin admin = conn.getAdmin();
+
+// Define schema with aggregation functions (recommended way)
+import org.apache.fluss.metadata.AggFunction;
+
+Schema schema = Schema.newBuilder()
+ .column("product_id", DataTypes.BIGINT())
+ .column("price", DataTypes.DOUBLE(), AggFunction.MAX)
+ .column("sales", DataTypes.BIGINT(), AggFunction.SUM)
+ .column("last_update_time", DataTypes.TIMESTAMP(3)) // Defaults to
LAST_VALUE_IGNORE_NULLS
+ .primaryKey("product_id")
+ .build();
+
+// Create table with aggregation merge engine
+TableDescriptor tableDescriptor = TableDescriptor.builder()
+ .schema(schema)
+ .property("table.merge-engine", "aggregation")
+ .build();
+
+TablePath tablePath = TablePath.of("my_database", "product_stats");
+admin.createTable(tablePath, tableDescriptor, false).get();
+```
+
+### Writing Data
+
+```java
+// Get table
+Table table = conn.getTable(tablePath);
+
+// Create upsert writer
+UpsertWriter writer = table.newUpsert().createWriter();
+
+// Write data - these will be aggregated
+writer.upsert(row(1L, 23.0, 15L, timestamp1));
+writer.upsert(row(1L, 30.2, 20L, timestamp2)); // Same primary key - triggers
aggregation
+
+writer.flush();
+```
+
+**Result after aggregation:**
+- `product_id`: 1
+- `price`: 30.2 (max of 23.0 and 30.2)
+- `sales`: 35 (sum of 15 and 20)
+- `last_update_time`: timestamp2 (last non-null value)
+
+## Supported Aggregate Functions
+
+Fluss currently supports the following aggregate functions:
+
+### sum
+
+Aggregates values by computing the sum across multiple rows.
+
+- **Supported Data Types**: TINYINT, SMALLINT, INT, BIGINT, FLOAT, DOUBLE,
DECIMAL
Review Comment:
```suggestion
- **Supported Data Types**: `TINYINT`, `SMALLINT`, `INT`, `BIGINT`, `FLOAT`,
`DOUBLE`, `DECIMAL`
```
nit: use code format for the data types. Same to others.
##########
website/docs/table-design/table-types/pk-table/merge-engines/aggregation.md:
##########
@@ -0,0 +1,534 @@
+---
+sidebar_label: Aggregation
+title: Aggregation Merge Engine
+sidebar_position: 4
+---
+
+# Aggregation Merge Engine
+
+## Overview
+
+The **Aggregation Merge Engine** is designed for scenarios where users only
care about aggregated results rather than individual records. It aggregates
each value field with the latest data one by one under the same primary key
according to the specified aggregate function.
+
+Each field not part of the primary keys can be assigned an aggregate function
using the Schema API (recommended) or connector options
(`'fields.<field-name>.agg'`). If no function is specified for a field, it will
use `last_value_ignore_nulls` aggregation as the default behavior.
+
+This merge engine is useful for real-time aggregation scenarios such as:
+- Computing running totals and statistics
+- Maintaining counters and metrics
+- Tracking maximum/minimum values over time
+- Building real-time dashboards and analytics
+
+## Configuration
+
+To enable the aggregation merge engine, set the following table property:
+
+```
+'table.merge-engine' = 'aggregation'
+```
+
+Then specify the aggregate function for each non-primary key field using
connector options:
+
+```
+'fields.<field-name>.agg' = '<function-name>'
+```
+
+**Note**: The recommended way is to use Schema API (see section "API Usage"
below). The connector option is provided as an alternative for
connector-specific scenarios.
+
+## API Usage
+
+### Creating a Table with Aggregation
+
+```java
+import org.apache.fluss.client.Connection;
+import org.apache.fluss.client.admin.Admin;
+import org.apache.fluss.client.table.Table;
+import org.apache.fluss.metadata.Schema;
+import org.apache.fluss.metadata.TableDescriptor;
+import org.apache.fluss.metadata.TablePath;
+import org.apache.fluss.types.DataTypes;
+
+// Create connection
+Connection conn = Connection.create(config);
+Admin admin = conn.getAdmin();
+
+// Define schema with aggregation functions (recommended way)
+import org.apache.fluss.metadata.AggFunction;
+
+Schema schema = Schema.newBuilder()
+ .column("product_id", DataTypes.BIGINT())
+ .column("price", DataTypes.DOUBLE(), AggFunction.MAX)
+ .column("sales", DataTypes.BIGINT(), AggFunction.SUM)
+ .column("last_update_time", DataTypes.TIMESTAMP(3)) // Defaults to
LAST_VALUE_IGNORE_NULLS
+ .primaryKey("product_id")
+ .build();
+
+// Create table with aggregation merge engine
+TableDescriptor tableDescriptor = TableDescriptor.builder()
+ .schema(schema)
+ .property("table.merge-engine", "aggregation")
+ .build();
+
+TablePath tablePath = TablePath.of("my_database", "product_stats");
+admin.createTable(tablePath, tableDescriptor, false).get();
+```
+
+### Writing Data
+
+```java
+// Get table
+Table table = conn.getTable(tablePath);
+
+// Create upsert writer
+UpsertWriter writer = table.newUpsert().createWriter();
+
+// Write data - these will be aggregated
+writer.upsert(row(1L, 23.0, 15L, timestamp1));
+writer.upsert(row(1L, 30.2, 20L, timestamp2)); // Same primary key - triggers
aggregation
+
+writer.flush();
+```
+
+**Result after aggregation:**
+- `product_id`: 1
+- `price`: 30.2 (max of 23.0 and 30.2)
+- `sales`: 35 (sum of 15 and 20)
+- `last_update_time`: timestamp2 (last non-null value)
+
+## Supported Aggregate Functions
+
+Fluss currently supports the following aggregate functions:
+
+### sum
+
+Aggregates values by computing the sum across multiple rows.
+
+- **Supported Data Types**: TINYINT, SMALLINT, INT, BIGINT, FLOAT, DOUBLE,
DECIMAL
+- **Behavior**: Adds incoming values to the accumulator
+- **Null Handling**: Null values are ignored
+
+**Example:**
+```java
+Schema schema = Schema.newBuilder()
+ .column("id", DataTypes.BIGINT())
+ .column("amount", DataTypes.DECIMAL(10, 2), AggFunction.SUM)
+ .primaryKey("id")
+ .build();
+
+TableDescriptor.builder()
+ .schema(schema)
+ .property("table.merge-engine", "aggregation")
+ .build();
+
+// Input: (1, 100.50), (1, 200.75)
+// Result: (1, 301.25)
+```
+
+### product
+
+Computes the product of values across multiple rows.
+
+- **Supported Data Types**: TINYINT, SMALLINT, INT, BIGINT, FLOAT, DOUBLE,
DECIMAL
+- **Behavior**: Multiplies incoming values with the accumulator
+- **Null Handling**: Null values are ignored
+
+**Example:**
+```java
+Schema schema = Schema.newBuilder()
+ .column("id", DataTypes.BIGINT())
+ .column("discount_factor", DataTypes.DOUBLE(), AggFunction.PRODUCT)
+ .primaryKey("id")
+ .build();
+
+TableDescriptor.builder()
+ .schema(schema)
+ .property("table.merge-engine", "aggregation")
+ .build();
+
+// Input: (1, 0.9), (1, 0.8)
+// Result: (1, 0.72) -- 90% * 80% = 72%
+```
+
+### max
+
+Identifies and retains the maximum value.
+
+- **Supported Data Types**: CHAR, STRING, TINYINT, SMALLINT, INT, BIGINT,
FLOAT, DOUBLE, DECIMAL, DATE, TIME, TIMESTAMP, TIMESTAMP_LTZ
+- **Behavior**: Keeps the larger value between accumulator and incoming value
+- **Null Handling**: Null values are ignored
+
+**Example:**
+```java
+Schema schema = Schema.newBuilder()
+ .column("id", DataTypes.BIGINT())
+ .column("max_temperature", DataTypes.DOUBLE(), AggFunction.MAX)
+ .column("max_reading_time", DataTypes.TIMESTAMP(3), AggFunction.MAX)
+ .primaryKey("id")
+ .build();
+
+TableDescriptor.builder()
+ .schema(schema)
+ .property("table.merge-engine", "aggregation")
+ .build();
+
+// Input: (1, 25.5, '2024-01-01 10:00:00'), (1, 28.3, '2024-01-01 11:00:00')
+// Result: (1, 28.3, '2024-01-01 11:00:00')
+```
+
+### min
+
+Identifies and retains the minimum value.
+
+- **Supported Data Types**: CHAR, STRING, TINYINT, SMALLINT, INT, BIGINT,
FLOAT, DOUBLE, DECIMAL, DATE, TIME, TIMESTAMP, TIMESTAMP_LTZ
+- **Behavior**: Keeps the smaller value between accumulator and incoming value
+- **Null Handling**: Null values are ignored
+
+**Example:**
+```java
+Schema schema = Schema.newBuilder()
+ .column("id", DataTypes.BIGINT())
+ .column("lowest_price", DataTypes.DECIMAL(10, 2), AggFunction.MIN)
+ .primaryKey("id")
+ .build();
+
+TableDescriptor.builder()
+ .schema(schema)
+ .property("table.merge-engine", "aggregation")
+ .build();
+
+// Input: (1, 99.99), (1, 79.99), (1, 89.99)
+// Result: (1, 79.99)
+```
+
+### last_value
+
+Replaces the previous value with the most recently received value.
+
+- **Supported Data Types**: All data types
+- **Behavior**: Always uses the latest incoming value
+- **Null Handling**: Null values will overwrite previous values
+
+**Example:**
+```java
+Schema schema = Schema.newBuilder()
+ .column("id", DataTypes.BIGINT())
+ .column("status", DataTypes.STRING(), AggFunction.LAST_VALUE)
+ .column("last_login", DataTypes.TIMESTAMP(3), AggFunction.LAST_VALUE)
+ .primaryKey("id")
+ .build();
+
+TableDescriptor.builder()
+ .schema(schema)
+ .property("table.merge-engine", "aggregation")
+ .build();
+
+// Input: (1, 'online', '2024-01-01 10:00:00'), (1, 'offline', '2024-01-01
11:00:00')
+// Result: (1, 'offline', '2024-01-01 11:00:00')
+```
+
+### last_value_ignore_nulls
+
+Replaces the previous value with the latest non-null value. This is the
**default aggregate function** when no function is specified.
+
+- **Supported Data Types**: All data types
+- **Behavior**: Uses the latest incoming value only if it's not null
+- **Null Handling**: Null values are ignored, previous value is retained
+
+**Example:**
+```java
+Schema schema = Schema.newBuilder()
+ .column("id", DataTypes.BIGINT())
+ .column("email", DataTypes.STRING(), AggFunction.LAST_VALUE_IGNORE_NULLS)
+ .column("phone", DataTypes.STRING(), AggFunction.LAST_VALUE_IGNORE_NULLS)
+ .primaryKey("id")
+ .build();
+
+TableDescriptor.builder()
+ .schema(schema)
+ .property("table.merge-engine", "aggregation")
+ .build();
+
+// Input: (1, '[email protected]', '123-456'), (1, null, '789-012')
+// Result: (1, '[email protected]', '789-012')
+// Email remains '[email protected]' because the second upsert had null email
+```
+
+### first_value
+
+Retrieves and retains the first value seen for a field.
+
+- **Supported Data Types**: All data types
+- **Behavior**: Keeps the first received value, ignores all subsequent values
+- **Null Handling**: Null values are retained if received first
+
+**Example:**
+```java
+Schema schema = Schema.newBuilder()
+ .column("id", DataTypes.BIGINT())
+ .column("first_purchase_date", DataTypes.DATE(), AggFunction.FIRST_VALUE)
+ .column("first_product", DataTypes.STRING(), AggFunction.FIRST_VALUE)
+ .primaryKey("id")
+ .build();
+
+TableDescriptor.builder()
+ .schema(schema)
+ .property("table.merge-engine", "aggregation")
+ .build();
+
+// Input: (1, '2024-01-01', 'ProductA'), (1, '2024-02-01', 'ProductB')
+// Result: (1, '2024-01-01', 'ProductA')
+```
+
+### first_value_ignore_nulls
+
+Selects the first non-null value in a data set.
+
+- **Supported Data Types**: All data types
+- **Behavior**: Keeps the first received non-null value, ignores all
subsequent values
+- **Null Handling**: Null values are ignored until a non-null value is received
+
+**Example:**
+```java
+Schema schema = Schema.newBuilder()
+ .column("id", DataTypes.BIGINT())
+ .column("email", DataTypes.STRING(), AggFunction.FIRST_VALUE_IGNORE_NULLS)
+ .column("verified_at", DataTypes.TIMESTAMP(3),
AggFunction.FIRST_VALUE_IGNORE_NULLS)
+ .primaryKey("id")
+ .build();
+
+TableDescriptor.builder()
+ .schema(schema)
+ .property("table.merge-engine", "aggregation")
+ .build();
+
+// Input: (1, null, null), (1, '[email protected]', '2024-01-01 10:00:00'), (1,
'[email protected]', '2024-01-02 10:00:00')
+// Result: (1, '[email protected]', '2024-01-01 10:00:00')
+```
+
+### listagg
+
+Concatenates multiple string values into a single string with a delimiter.
+
+- **Supported Data Types**: STRING, CHAR
+- **Behavior**: Concatenates values using the specified delimiter
+- **Null Handling**: Null values are skipped
+- **Configuration**: Use
`'table.merge-engine.aggregate.<field-name>.listagg-delimiter'` to specify a
custom delimiter (default is comma `,`)
+
+**Example:**
+```java
+Schema schema = Schema.newBuilder()
+ .column("id", DataTypes.BIGINT())
+ .column("tags", DataTypes.STRING(), AggFunction.LISTAGG)
+ .primaryKey("id")
+ .build();
+
+TableDescriptor.builder()
+ .schema(schema)
+ .property("table.merge-engine", "aggregation")
+ .property("table.merge-engine.aggregate.tags.listagg-delimiter", ";")
+ .build();
+
+// Input: (1, 'developer'), (1, 'java'), (1, 'flink')
+// Result: (1, 'developer;java;flink')
+```
+
+### string_agg
+
+Alias for `listagg`. Concatenates multiple string values into a single string
with a delimiter.
+
+- **Supported Data Types**: STRING, CHAR
+- **Behavior**: Same as `listagg` - concatenates values using the specified
delimiter
+- **Null Handling**: Null values are skipped
+- **Configuration**: Use
`'table.merge-engine.aggregate.<field-name>.listagg-delimiter'` to specify a
custom delimiter (default is comma `,`)
+
+**Example:**
+```java
+Schema schema = Schema.newBuilder()
+ .column("id", DataTypes.BIGINT())
+ .column("tags", DataTypes.STRING(), AggFunction.STRING_AGG)
+ .primaryKey("id")
+ .build();
+
+TableDescriptor.builder()
+ .schema(schema)
+ .property("table.merge-engine", "aggregation")
+ .property("table.merge-engine.aggregate.tags.listagg-delimiter", ";")
+ .build();
+
+// Input: (1, 'developer'), (1, 'java'), (1, 'flink')
+// Result: (1, 'developer;java;flink')
+```
+
+### bool_and
+
+Evaluates whether all boolean values in a set are true (logical AND).
+
+- **Supported Data Types**: BOOLEAN
+- **Behavior**: Returns true only if all values are true
+- **Null Handling**: Null values are ignored
+
+**Example:**
+```java
+Schema schema = Schema.newBuilder()
+ .column("id", DataTypes.BIGINT())
+ .column("has_all_permissions", DataTypes.BOOLEAN(), AggFunction.BOOL_AND)
+ .primaryKey("id")
+ .build();
+
+TableDescriptor.builder()
+ .schema(schema)
+ .property("table.merge-engine", "aggregation")
+ .build();
+
+// Input: (1, true), (1, true), (1, false)
+// Result: (1, false) -- Not all values are true
+```
+
+### bool_or
+
+Checks if at least one boolean value in a set is true (logical OR).
+
+- **Supported Data Types**: BOOLEAN
+- **Behavior**: Returns true if any value is true
+- **Null Handling**: Null values are ignored
+
+**Example:**
+```java
+Schema schema = Schema.newBuilder()
+ .column("id", DataTypes.BIGINT())
+ .column("has_any_alert", DataTypes.BOOLEAN(), AggFunction.BOOL_OR)
+ .primaryKey("id")
+ .build();
+
+TableDescriptor.builder()
+ .schema(schema)
+ .property("table.merge-engine", "aggregation")
+ .build();
+
+// Input: (1, false), (1, false), (1, true)
+// Result: (1, true) -- At least one value is true
+```
+
+## Advanced Configuration
+
+### Default Aggregate Function
+
+You can set a default aggregate function for all non-primary key fields that
don't have an explicitly specified function:
+
+```java
+Schema schema = Schema.newBuilder()
+ .column("id", DataTypes.BIGINT())
+ .column("col1", DataTypes.STRING()) // Defaults to LAST_VALUE_IGNORE_NULLS
+ .column("col2", DataTypes.BIGINT(), AggFunction.SUM) // Explicitly set to
SUM
+ .primaryKey("id")
+ .build();
+
+TableDescriptor.builder()
+ .schema(schema)
+ .property("table.merge-engine", "aggregation")
+ .build();
+```
+
+In this example:
+- `col2` uses `sum` aggregation (explicitly specified)
+- `col1` uses `last_value_ignore_nulls` as the default
+
+### Partial Update with Aggregation
+
+The aggregation merge engine supports partial updates through the UpsertWriter
API. When performing a partial update:
+
+- **Target columns**: These columns will be aggregated according to their
configured aggregate functions
+- **Non-target columns**: These columns will retain their existing values from
the old row
+
+**Example:**
+
+```java
+Schema schema = Schema.newBuilder()
+ .column("id", DataTypes.BIGINT())
+ .column("count1", DataTypes.BIGINT(), AggFunction.SUM)
+ .column("count2", DataTypes.BIGINT(), AggFunction.SUM)
+ .column("sum1", DataTypes.DOUBLE(), AggFunction.SUM)
+ .column("sum2", DataTypes.DOUBLE(), AggFunction.SUM)
+ .primaryKey("id")
+ .build();
+
+TableDescriptor tableDescriptor = TableDescriptor.builder()
+ .schema(schema)
+ .property("table.merge-engine", "aggregation")
+ .build();
+
+// Create partial update writer targeting only id, count1, and sum1
+int[] targetColumns = new int[]{0, 1, 3}; // id, count1, sum1
+UpsertWriter partialWriter = table.newUpsert()
+ .withPartialUpdate(targetColumns)
+ .createWriter();
+
+// When writing:
+// - count1 and sum1 will be aggregated with existing values
+// - count2 and sum2 will remain unchanged
+```
+
+**Use cases for partial aggregation**:
+1. **Independent metrics**: When different processes update different subsets
of metrics for the same key
+2. **Reduced data transfer**: Only send the columns that need to be updated
+3. **Flexible pipelines**: Different data sources can contribute to different
aggregated fields
+
+### Delete Behavior
+
+The aggregation merge engine provides limited support for delete operations.
You can configure the behavior using the `'table.agg.remove-record-on-delete'`
option:
+
+```java
+TableDescriptor.builder()
+ .schema(schema)
+ .property("table.merge-engine", "aggregation")
+ .property("table.agg.remove-record-on-delete", "true") // Default is false
+ .build();
+```
+
+**Configuration options**:
+- **`'table.agg.remove-record-on-delete' = 'false'`** (default): Delete
operations will cause an error
+- **`'table.agg.remove-record-on-delete' = 'true'`**: Delete operations will
remove the entire record from the table
Review Comment:
This configuration appears to serve the same purpose (and may conflict) with
the existing `table.delete.behavior` setting, which defaults to `ALLOW` for
regular tables, but is default to `IGNORE` for `FIRST_ROW` and `VERSIONED`
merge engines.
I suggest we **reuse** `table.delete.behavior` and simply extend its logic
to also default to `IGNORE` for the **aggregation merge engine**, rather than
introducing a new, overlapping configuration. This keeps the configuration
surface consistent and avoids ambiguity.
##########
website/docs/table-design/table-types/pk-table/merge-engines/aggregation.md:
##########
@@ -0,0 +1,534 @@
+---
+sidebar_label: Aggregation
+title: Aggregation Merge Engine
+sidebar_position: 4
+---
+
+# Aggregation Merge Engine
+
+## Overview
+
+The **Aggregation Merge Engine** is designed for scenarios where users only
care about aggregated results rather than individual records. It aggregates
each value field with the latest data one by one under the same primary key
according to the specified aggregate function.
+
+Each field not part of the primary keys can be assigned an aggregate function
using the Schema API (recommended) or connector options
(`'fields.<field-name>.agg'`). If no function is specified for a field, it will
use `last_value_ignore_nulls` aggregation as the default behavior.
+
+This merge engine is useful for real-time aggregation scenarios such as:
+- Computing running totals and statistics
+- Maintaining counters and metrics
+- Tracking maximum/minimum values over time
+- Building real-time dashboards and analytics
+
+## Configuration
+
+To enable the aggregation merge engine, set the following table property:
+
+```
+'table.merge-engine' = 'aggregation'
+```
+
+Then specify the aggregate function for each non-primary key field using
connector options:
+
+```
+'fields.<field-name>.agg' = '<function-name>'
+```
+
+**Note**: The recommended way is to use Schema API (see section "API Usage"
below). The connector option is provided as an alternative for
connector-specific scenarios.
+
+## API Usage
+
+### Creating a Table with Aggregation
+
+```java
+import org.apache.fluss.client.Connection;
+import org.apache.fluss.client.admin.Admin;
+import org.apache.fluss.client.table.Table;
+import org.apache.fluss.metadata.Schema;
+import org.apache.fluss.metadata.TableDescriptor;
+import org.apache.fluss.metadata.TablePath;
+import org.apache.fluss.types.DataTypes;
+
+// Create connection
+Connection conn = Connection.create(config);
+Admin admin = conn.getAdmin();
+
+// Define schema with aggregation functions (recommended way)
+import org.apache.fluss.metadata.AggFunction;
+
+Schema schema = Schema.newBuilder()
+ .column("product_id", DataTypes.BIGINT())
+ .column("price", DataTypes.DOUBLE(), AggFunction.MAX)
+ .column("sales", DataTypes.BIGINT(), AggFunction.SUM)
+ .column("last_update_time", DataTypes.TIMESTAMP(3)) // Defaults to
LAST_VALUE_IGNORE_NULLS
+ .primaryKey("product_id")
+ .build();
+
+// Create table with aggregation merge engine
+TableDescriptor tableDescriptor = TableDescriptor.builder()
+ .schema(schema)
+ .property("table.merge-engine", "aggregation")
+ .build();
+
+TablePath tablePath = TablePath.of("my_database", "product_stats");
+admin.createTable(tablePath, tableDescriptor, false).get();
+```
+
+### Writing Data
+
+```java
+// Get table
+Table table = conn.getTable(tablePath);
+
+// Create upsert writer
+UpsertWriter writer = table.newUpsert().createWriter();
+
+// Write data - these will be aggregated
+writer.upsert(row(1L, 23.0, 15L, timestamp1));
+writer.upsert(row(1L, 30.2, 20L, timestamp2)); // Same primary key - triggers
aggregation
+
+writer.flush();
+```
+
+**Result after aggregation:**
+- `product_id`: 1
+- `price`: 30.2 (max of 23.0 and 30.2)
+- `sales`: 35 (sum of 15 and 20)
+- `last_update_time`: timestamp2 (last non-null value)
+
+## Supported Aggregate Functions
+
+Fluss currently supports the following aggregate functions:
+
+### sum
+
+Aggregates values by computing the sum across multiple rows.
+
+- **Supported Data Types**: TINYINT, SMALLINT, INT, BIGINT, FLOAT, DOUBLE,
DECIMAL
+- **Behavior**: Adds incoming values to the accumulator
+- **Null Handling**: Null values are ignored
+
+**Example:**
+```java
+Schema schema = Schema.newBuilder()
+ .column("id", DataTypes.BIGINT())
+ .column("amount", DataTypes.DECIMAL(10, 2), AggFunction.SUM)
+ .primaryKey("id")
+ .build();
+
+TableDescriptor.builder()
+ .schema(schema)
+ .property("table.merge-engine", "aggregation")
+ .build();
+
+// Input: (1, 100.50), (1, 200.75)
+// Result: (1, 301.25)
+```
+
+### product
+
+Computes the product of values across multiple rows.
+
+- **Supported Data Types**: TINYINT, SMALLINT, INT, BIGINT, FLOAT, DOUBLE,
DECIMAL
+- **Behavior**: Multiplies incoming values with the accumulator
+- **Null Handling**: Null values are ignored
+
+**Example:**
+```java
+Schema schema = Schema.newBuilder()
+ .column("id", DataTypes.BIGINT())
+ .column("discount_factor", DataTypes.DOUBLE(), AggFunction.PRODUCT)
+ .primaryKey("id")
+ .build();
+
+TableDescriptor.builder()
+ .schema(schema)
+ .property("table.merge-engine", "aggregation")
+ .build();
+
+// Input: (1, 0.9), (1, 0.8)
+// Result: (1, 0.72) -- 90% * 80% = 72%
+```
+
+### max
+
+Identifies and retains the maximum value.
+
+- **Supported Data Types**: CHAR, STRING, TINYINT, SMALLINT, INT, BIGINT,
FLOAT, DOUBLE, DECIMAL, DATE, TIME, TIMESTAMP, TIMESTAMP_LTZ
+- **Behavior**: Keeps the larger value between accumulator and incoming value
+- **Null Handling**: Null values are ignored
+
+**Example:**
+```java
+Schema schema = Schema.newBuilder()
+ .column("id", DataTypes.BIGINT())
+ .column("max_temperature", DataTypes.DOUBLE(), AggFunction.MAX)
+ .column("max_reading_time", DataTypes.TIMESTAMP(3), AggFunction.MAX)
+ .primaryKey("id")
+ .build();
+
+TableDescriptor.builder()
+ .schema(schema)
+ .property("table.merge-engine", "aggregation")
+ .build();
+
+// Input: (1, 25.5, '2024-01-01 10:00:00'), (1, 28.3, '2024-01-01 11:00:00')
+// Result: (1, 28.3, '2024-01-01 11:00:00')
+```
+
+### min
+
+Identifies and retains the minimum value.
+
+- **Supported Data Types**: CHAR, STRING, TINYINT, SMALLINT, INT, BIGINT,
FLOAT, DOUBLE, DECIMAL, DATE, TIME, TIMESTAMP, TIMESTAMP_LTZ
+- **Behavior**: Keeps the smaller value between accumulator and incoming value
+- **Null Handling**: Null values are ignored
+
+**Example:**
+```java
+Schema schema = Schema.newBuilder()
+ .column("id", DataTypes.BIGINT())
+ .column("lowest_price", DataTypes.DECIMAL(10, 2), AggFunction.MIN)
+ .primaryKey("id")
+ .build();
+
+TableDescriptor.builder()
+ .schema(schema)
+ .property("table.merge-engine", "aggregation")
+ .build();
+
+// Input: (1, 99.99), (1, 79.99), (1, 89.99)
+// Result: (1, 79.99)
+```
+
+### last_value
+
+Replaces the previous value with the most recently received value.
+
+- **Supported Data Types**: All data types
+- **Behavior**: Always uses the latest incoming value
+- **Null Handling**: Null values will overwrite previous values
+
+**Example:**
+```java
+Schema schema = Schema.newBuilder()
+ .column("id", DataTypes.BIGINT())
+ .column("status", DataTypes.STRING(), AggFunction.LAST_VALUE)
+ .column("last_login", DataTypes.TIMESTAMP(3), AggFunction.LAST_VALUE)
+ .primaryKey("id")
+ .build();
+
+TableDescriptor.builder()
+ .schema(schema)
+ .property("table.merge-engine", "aggregation")
+ .build();
+
+// Input: (1, 'online', '2024-01-01 10:00:00'), (1, 'offline', '2024-01-01
11:00:00')
+// Result: (1, 'offline', '2024-01-01 11:00:00')
+```
+
+### last_value_ignore_nulls
+
+Replaces the previous value with the latest non-null value. This is the
**default aggregate function** when no function is specified.
+
+- **Supported Data Types**: All data types
+- **Behavior**: Uses the latest incoming value only if it's not null
+- **Null Handling**: Null values are ignored, previous value is retained
+
+**Example:**
+```java
+Schema schema = Schema.newBuilder()
+ .column("id", DataTypes.BIGINT())
+ .column("email", DataTypes.STRING(), AggFunction.LAST_VALUE_IGNORE_NULLS)
+ .column("phone", DataTypes.STRING(), AggFunction.LAST_VALUE_IGNORE_NULLS)
+ .primaryKey("id")
+ .build();
+
+TableDescriptor.builder()
+ .schema(schema)
+ .property("table.merge-engine", "aggregation")
+ .build();
+
+// Input: (1, '[email protected]', '123-456'), (1, null, '789-012')
+// Result: (1, '[email protected]', '789-012')
+// Email remains '[email protected]' because the second upsert had null email
+```
+
+### first_value
+
+Retrieves and retains the first value seen for a field.
+
+- **Supported Data Types**: All data types
+- **Behavior**: Keeps the first received value, ignores all subsequent values
+- **Null Handling**: Null values are retained if received first
+
+**Example:**
+```java
+Schema schema = Schema.newBuilder()
+ .column("id", DataTypes.BIGINT())
+ .column("first_purchase_date", DataTypes.DATE(), AggFunction.FIRST_VALUE)
+ .column("first_product", DataTypes.STRING(), AggFunction.FIRST_VALUE)
+ .primaryKey("id")
+ .build();
+
+TableDescriptor.builder()
+ .schema(schema)
+ .property("table.merge-engine", "aggregation")
+ .build();
+
+// Input: (1, '2024-01-01', 'ProductA'), (1, '2024-02-01', 'ProductB')
+// Result: (1, '2024-01-01', 'ProductA')
+```
+
+### first_value_ignore_nulls
+
+Selects the first non-null value in a data set.
+
+- **Supported Data Types**: All data types
+- **Behavior**: Keeps the first received non-null value, ignores all
subsequent values
+- **Null Handling**: Null values are ignored until a non-null value is received
+
+**Example:**
+```java
+Schema schema = Schema.newBuilder()
+ .column("id", DataTypes.BIGINT())
+ .column("email", DataTypes.STRING(), AggFunction.FIRST_VALUE_IGNORE_NULLS)
+ .column("verified_at", DataTypes.TIMESTAMP(3),
AggFunction.FIRST_VALUE_IGNORE_NULLS)
+ .primaryKey("id")
+ .build();
+
+TableDescriptor.builder()
+ .schema(schema)
+ .property("table.merge-engine", "aggregation")
+ .build();
+
+// Input: (1, null, null), (1, '[email protected]', '2024-01-01 10:00:00'), (1,
'[email protected]', '2024-01-02 10:00:00')
+// Result: (1, '[email protected]', '2024-01-01 10:00:00')
+```
+
+### listagg
+
+Concatenates multiple string values into a single string with a delimiter.
+
+- **Supported Data Types**: STRING, CHAR
+- **Behavior**: Concatenates values using the specified delimiter
+- **Null Handling**: Null values are skipped
+- **Configuration**: Use
`'table.merge-engine.aggregate.<field-name>.listagg-delimiter'` to specify a
custom delimiter (default is comma `,`)
+
+**Example:**
+```java
+Schema schema = Schema.newBuilder()
+ .column("id", DataTypes.BIGINT())
+ .column("tags", DataTypes.STRING(), AggFunction.LISTAGG)
+ .primaryKey("id")
+ .build();
+
+TableDescriptor.builder()
+ .schema(schema)
+ .property("table.merge-engine", "aggregation")
+ .property("table.merge-engine.aggregate.tags.listagg-delimiter", ";")
+ .build();
+
+// Input: (1, 'developer'), (1, 'java'), (1, 'flink')
+// Result: (1, 'developer;java;flink')
+```
+
+### string_agg
+
+Alias for `listagg`. Concatenates multiple string values into a single string
with a delimiter.
+
+- **Supported Data Types**: STRING, CHAR
+- **Behavior**: Same as `listagg` - concatenates values using the specified
delimiter
+- **Null Handling**: Null values are skipped
+- **Configuration**: Use
`'table.merge-engine.aggregate.<field-name>.listagg-delimiter'` to specify a
custom delimiter (default is comma `,`)
+
+**Example:**
+```java
+Schema schema = Schema.newBuilder()
+ .column("id", DataTypes.BIGINT())
+ .column("tags", DataTypes.STRING(), AggFunction.STRING_AGG)
+ .primaryKey("id")
+ .build();
+
+TableDescriptor.builder()
+ .schema(schema)
+ .property("table.merge-engine", "aggregation")
+ .property("table.merge-engine.aggregate.tags.listagg-delimiter", ";")
+ .build();
+
+// Input: (1, 'developer'), (1, 'java'), (1, 'flink')
+// Result: (1, 'developer;java;flink')
+```
+
+### bool_and
+
+Evaluates whether all boolean values in a set are true (logical AND).
+
+- **Supported Data Types**: BOOLEAN
+- **Behavior**: Returns true only if all values are true
+- **Null Handling**: Null values are ignored
+
+**Example:**
+```java
+Schema schema = Schema.newBuilder()
+ .column("id", DataTypes.BIGINT())
+ .column("has_all_permissions", DataTypes.BOOLEAN(), AggFunction.BOOL_AND)
+ .primaryKey("id")
+ .build();
+
+TableDescriptor.builder()
+ .schema(schema)
+ .property("table.merge-engine", "aggregation")
+ .build();
+
+// Input: (1, true), (1, true), (1, false)
+// Result: (1, false) -- Not all values are true
+```
+
+### bool_or
+
+Checks if at least one boolean value in a set is true (logical OR).
+
+- **Supported Data Types**: BOOLEAN
+- **Behavior**: Returns true if any value is true
+- **Null Handling**: Null values are ignored
+
+**Example:**
+```java
+Schema schema = Schema.newBuilder()
+ .column("id", DataTypes.BIGINT())
+ .column("has_any_alert", DataTypes.BOOLEAN(), AggFunction.BOOL_OR)
+ .primaryKey("id")
+ .build();
+
+TableDescriptor.builder()
+ .schema(schema)
+ .property("table.merge-engine", "aggregation")
+ .build();
+
+// Input: (1, false), (1, false), (1, true)
+// Result: (1, true) -- At least one value is true
+```
+
+## Advanced Configuration
+
+### Default Aggregate Function
+
+You can set a default aggregate function for all non-primary key fields that
don't have an explicitly specified function:
+
+```java
+Schema schema = Schema.newBuilder()
+ .column("id", DataTypes.BIGINT())
+ .column("col1", DataTypes.STRING()) // Defaults to LAST_VALUE_IGNORE_NULLS
+ .column("col2", DataTypes.BIGINT(), AggFunction.SUM) // Explicitly set to
SUM
+ .primaryKey("id")
+ .build();
+
+TableDescriptor.builder()
+ .schema(schema)
+ .property("table.merge-engine", "aggregation")
+ .build();
+```
+
+In this example:
+- `col2` uses `sum` aggregation (explicitly specified)
+- `col1` uses `last_value_ignore_nulls` as the default
+
+### Partial Update with Aggregation
+
+The aggregation merge engine supports partial updates through the UpsertWriter
API. When performing a partial update:
+
+- **Target columns**: These columns will be aggregated according to their
configured aggregate functions
+- **Non-target columns**: These columns will retain their existing values from
the old row
+
+**Example:**
+
+```java
+Schema schema = Schema.newBuilder()
+ .column("id", DataTypes.BIGINT())
+ .column("count1", DataTypes.BIGINT(), AggFunction.SUM)
+ .column("count2", DataTypes.BIGINT(), AggFunction.SUM)
+ .column("sum1", DataTypes.DOUBLE(), AggFunction.SUM)
+ .column("sum2", DataTypes.DOUBLE(), AggFunction.SUM)
+ .primaryKey("id")
+ .build();
+
+TableDescriptor tableDescriptor = TableDescriptor.builder()
+ .schema(schema)
+ .property("table.merge-engine", "aggregation")
+ .build();
+
+// Create partial update writer targeting only id, count1, and sum1
+int[] targetColumns = new int[]{0, 1, 3}; // id, count1, sum1
+UpsertWriter partialWriter = table.newUpsert()
+ .withPartialUpdate(targetColumns)
+ .createWriter();
+
+// When writing:
+// - count1 and sum1 will be aggregated with existing values
+// - count2 and sum2 will remain unchanged
+```
+
+**Use cases for partial aggregation**:
+1. **Independent metrics**: When different processes update different subsets
of metrics for the same key
+2. **Reduced data transfer**: Only send the columns that need to be updated
+3. **Flexible pipelines**: Different data sources can contribute to different
aggregated fields
+
+### Delete Behavior
+
+The aggregation merge engine provides limited support for delete operations.
You can configure the behavior using the `'table.agg.remove-record-on-delete'`
option:
+
+```java
+TableDescriptor.builder()
+ .schema(schema)
+ .property("table.merge-engine", "aggregation")
+ .property("table.agg.remove-record-on-delete", "true") // Default is false
+ .build();
+```
+
+**Configuration options**:
+- **`'table.agg.remove-record-on-delete' = 'false'`** (default): Delete
operations will cause an error
+- **`'table.agg.remove-record-on-delete' = 'true'`**: Delete operations will
remove the entire record from the table
+
+:::note
+**Current Limitation**: The aggregation merge engine does not support
retraction semantics (e.g., subtracting from a sum, reverting a max). Delete
operations can only remove the entire record or be rejected.
+
+Future versions may support fine-grained retraction by enhancing the protocol
to carry row data with delete operations.
+:::
+
+## Performance Considerations
+
+1. **Choose Appropriate Aggregate Functions**: Select functions that match
your use case to avoid unnecessary computations
+2. **Primary Key Design**: Use appropriate primary keys to ensure proper
grouping of aggregated data
+3. **Null Handling**: Be aware of how each function handles null values to
avoid unexpected results
+4. **Delete Handling**: If you need to handle delete operations, be aware that
enabling `'table.agg.remove-record-on-delete' = 'true'` will remove entire
records rather than retracting aggregated values
+
+## Limitations
+
+:::warning Critical Limitations
+When using the `aggregation` merge engine, be aware of the following critical
limitations:
+
+### 1. Exactly-Once Semantics
+
+**Fluss engine does not natively support transactional writes, and therefore
does not directly support Exactly-Once semantics at the storage layer.**
+
+Exactly-Once semantics should be achieved through integration with compute
engines (e.g., Flink, Spark). For example, after failover, undo operations can
be generated for invalid writes to achieve rollback.
+
+For detailed information about Exactly-Once implementation, please refer to:
[FIP-21: Aggregation Merge
Engine](https://cwiki.apache.org/confluence/display/FLUSS/FIP-21%3A+Aggregation+Merge+Engine)
Review Comment:
From a user’s perspective, the current wording may give the impression that
Fluss’s aggregate merge engine **does not support exactly-once semantics at
all**. To avoid confusion, we should first clarify this at the beginning of the
section:
- **When writing to an aggregate merge engine table using the Flink
engine**, Fluss **does provide exactly-once guarantees**. Thanks to Flink’s
checkpointing mechanism, in the event of a failure and recovery, the Flink
connector automatically performs an **undo operation** to roll back the table
state to what it was at the last successful checkpoint. This ensures no
over-counting or under-counting: data remains consistent and accurate.
And then to mention the limitation:
- **However, when using the Fluss client API directly** (outside of Flink),
**exactly-once is not provided out of the box**. In such cases, users must
implement their own recovery logic (similar to what the Flink connector does)
by explicitly resetting the table state to a previous version by performing
undo operations.
##########
website/docs/table-design/table-types/pk-table/merge-engines/aggregation.md:
##########
@@ -0,0 +1,534 @@
+---
+sidebar_label: Aggregation
+title: Aggregation Merge Engine
+sidebar_position: 4
+---
+
+# Aggregation Merge Engine
+
+## Overview
+
+The **Aggregation Merge Engine** is designed for scenarios where users only
care about aggregated results rather than individual records. It aggregates
each value field with the latest data one by one under the same primary key
according to the specified aggregate function.
+
+Each field not part of the primary keys can be assigned an aggregate function
using the Schema API (recommended) or connector options
(`'fields.<field-name>.agg'`). If no function is specified for a field, it will
use `last_value_ignore_nulls` aggregation as the default behavior.
+
+This merge engine is useful for real-time aggregation scenarios such as:
+- Computing running totals and statistics
+- Maintaining counters and metrics
+- Tracking maximum/minimum values over time
+- Building real-time dashboards and analytics
+
+## Configuration
+
+To enable the aggregation merge engine, set the following table property:
+
+```
+'table.merge-engine' = 'aggregation'
+```
+
+Then specify the aggregate function for each non-primary key field using
connector options:
+
+```
+'fields.<field-name>.agg' = '<function-name>'
+```
+
+**Note**: The recommended way is to use Schema API (see section "API Usage"
below). The connector option is provided as an alternative for
connector-specific scenarios.
+
+## API Usage
+
+### Creating a Table with Aggregation
+
+```java
+import org.apache.fluss.client.Connection;
+import org.apache.fluss.client.admin.Admin;
+import org.apache.fluss.client.table.Table;
+import org.apache.fluss.metadata.Schema;
+import org.apache.fluss.metadata.TableDescriptor;
+import org.apache.fluss.metadata.TablePath;
+import org.apache.fluss.types.DataTypes;
+
+// Create connection
+Connection conn = Connection.create(config);
+Admin admin = conn.getAdmin();
+
+// Define schema with aggregation functions (recommended way)
+import org.apache.fluss.metadata.AggFunction;
+
+Schema schema = Schema.newBuilder()
+ .column("product_id", DataTypes.BIGINT())
+ .column("price", DataTypes.DOUBLE(), AggFunction.MAX)
+ .column("sales", DataTypes.BIGINT(), AggFunction.SUM)
+ .column("last_update_time", DataTypes.TIMESTAMP(3)) // Defaults to
LAST_VALUE_IGNORE_NULLS
+ .primaryKey("product_id")
+ .build();
+
+// Create table with aggregation merge engine
+TableDescriptor tableDescriptor = TableDescriptor.builder()
+ .schema(schema)
+ .property("table.merge-engine", "aggregation")
+ .build();
+
+TablePath tablePath = TablePath.of("my_database", "product_stats");
+admin.createTable(tablePath, tableDescriptor, false).get();
+```
+
+### Writing Data
+
+```java
+// Get table
+Table table = conn.getTable(tablePath);
+
+// Create upsert writer
+UpsertWriter writer = table.newUpsert().createWriter();
+
+// Write data - these will be aggregated
+writer.upsert(row(1L, 23.0, 15L, timestamp1));
+writer.upsert(row(1L, 30.2, 20L, timestamp2)); // Same primary key - triggers
aggregation
+
+writer.flush();
+```
+
+**Result after aggregation:**
+- `product_id`: 1
+- `price`: 30.2 (max of 23.0 and 30.2)
+- `sales`: 35 (sum of 15 and 20)
+- `last_update_time`: timestamp2 (last non-null value)
+
+## Supported Aggregate Functions
+
+Fluss currently supports the following aggregate functions:
+
+### sum
+
+Aggregates values by computing the sum across multiple rows.
+
+- **Supported Data Types**: TINYINT, SMALLINT, INT, BIGINT, FLOAT, DOUBLE,
DECIMAL
+- **Behavior**: Adds incoming values to the accumulator
+- **Null Handling**: Null values are ignored
+
+**Example:**
+```java
+Schema schema = Schema.newBuilder()
+ .column("id", DataTypes.BIGINT())
+ .column("amount", DataTypes.DECIMAL(10, 2), AggFunction.SUM)
+ .primaryKey("id")
+ .build();
+
+TableDescriptor.builder()
+ .schema(schema)
+ .property("table.merge-engine", "aggregation")
+ .build();
+
+// Input: (1, 100.50), (1, 200.75)
+// Result: (1, 301.25)
+```
+
+### product
+
+Computes the product of values across multiple rows.
+
+- **Supported Data Types**: TINYINT, SMALLINT, INT, BIGINT, FLOAT, DOUBLE,
DECIMAL
+- **Behavior**: Multiplies incoming values with the accumulator
+- **Null Handling**: Null values are ignored
+
+**Example:**
+```java
+Schema schema = Schema.newBuilder()
+ .column("id", DataTypes.BIGINT())
+ .column("discount_factor", DataTypes.DOUBLE(), AggFunction.PRODUCT)
+ .primaryKey("id")
+ .build();
+
+TableDescriptor.builder()
+ .schema(schema)
+ .property("table.merge-engine", "aggregation")
+ .build();
+
+// Input: (1, 0.9), (1, 0.8)
+// Result: (1, 0.72) -- 90% * 80% = 72%
+```
+
+### max
+
+Identifies and retains the maximum value.
+
+- **Supported Data Types**: CHAR, STRING, TINYINT, SMALLINT, INT, BIGINT,
FLOAT, DOUBLE, DECIMAL, DATE, TIME, TIMESTAMP, TIMESTAMP_LTZ
+- **Behavior**: Keeps the larger value between accumulator and incoming value
+- **Null Handling**: Null values are ignored
+
+**Example:**
+```java
+Schema schema = Schema.newBuilder()
+ .column("id", DataTypes.BIGINT())
+ .column("max_temperature", DataTypes.DOUBLE(), AggFunction.MAX)
+ .column("max_reading_time", DataTypes.TIMESTAMP(3), AggFunction.MAX)
+ .primaryKey("id")
+ .build();
+
+TableDescriptor.builder()
+ .schema(schema)
+ .property("table.merge-engine", "aggregation")
+ .build();
+
+// Input: (1, 25.5, '2024-01-01 10:00:00'), (1, 28.3, '2024-01-01 11:00:00')
+// Result: (1, 28.3, '2024-01-01 11:00:00')
+```
+
+### min
+
+Identifies and retains the minimum value.
+
+- **Supported Data Types**: CHAR, STRING, TINYINT, SMALLINT, INT, BIGINT,
FLOAT, DOUBLE, DECIMAL, DATE, TIME, TIMESTAMP, TIMESTAMP_LTZ
+- **Behavior**: Keeps the smaller value between accumulator and incoming value
+- **Null Handling**: Null values are ignored
+
+**Example:**
+```java
+Schema schema = Schema.newBuilder()
+ .column("id", DataTypes.BIGINT())
+ .column("lowest_price", DataTypes.DECIMAL(10, 2), AggFunction.MIN)
+ .primaryKey("id")
+ .build();
+
+TableDescriptor.builder()
+ .schema(schema)
+ .property("table.merge-engine", "aggregation")
+ .build();
+
+// Input: (1, 99.99), (1, 79.99), (1, 89.99)
+// Result: (1, 79.99)
+```
+
+### last_value
+
+Replaces the previous value with the most recently received value.
+
+- **Supported Data Types**: All data types
+- **Behavior**: Always uses the latest incoming value
+- **Null Handling**: Null values will overwrite previous values
+
+**Example:**
+```java
+Schema schema = Schema.newBuilder()
+ .column("id", DataTypes.BIGINT())
+ .column("status", DataTypes.STRING(), AggFunction.LAST_VALUE)
+ .column("last_login", DataTypes.TIMESTAMP(3), AggFunction.LAST_VALUE)
+ .primaryKey("id")
+ .build();
+
+TableDescriptor.builder()
+ .schema(schema)
+ .property("table.merge-engine", "aggregation")
+ .build();
+
+// Input: (1, 'online', '2024-01-01 10:00:00'), (1, 'offline', '2024-01-01
11:00:00')
+// Result: (1, 'offline', '2024-01-01 11:00:00')
+```
+
+### last_value_ignore_nulls
+
+Replaces the previous value with the latest non-null value. This is the
**default aggregate function** when no function is specified.
+
+- **Supported Data Types**: All data types
+- **Behavior**: Uses the latest incoming value only if it's not null
+- **Null Handling**: Null values are ignored, previous value is retained
+
+**Example:**
+```java
+Schema schema = Schema.newBuilder()
+ .column("id", DataTypes.BIGINT())
+ .column("email", DataTypes.STRING(), AggFunction.LAST_VALUE_IGNORE_NULLS)
+ .column("phone", DataTypes.STRING(), AggFunction.LAST_VALUE_IGNORE_NULLS)
+ .primaryKey("id")
+ .build();
+
+TableDescriptor.builder()
+ .schema(schema)
+ .property("table.merge-engine", "aggregation")
+ .build();
+
+// Input: (1, '[email protected]', '123-456'), (1, null, '789-012')
+// Result: (1, '[email protected]', '789-012')
+// Email remains '[email protected]' because the second upsert had null email
+```
+
+### first_value
+
+Retrieves and retains the first value seen for a field.
+
+- **Supported Data Types**: All data types
+- **Behavior**: Keeps the first received value, ignores all subsequent values
+- **Null Handling**: Null values are retained if received first
+
+**Example:**
+```java
+Schema schema = Schema.newBuilder()
+ .column("id", DataTypes.BIGINT())
+ .column("first_purchase_date", DataTypes.DATE(), AggFunction.FIRST_VALUE)
+ .column("first_product", DataTypes.STRING(), AggFunction.FIRST_VALUE)
+ .primaryKey("id")
+ .build();
+
+TableDescriptor.builder()
+ .schema(schema)
+ .property("table.merge-engine", "aggregation")
+ .build();
+
+// Input: (1, '2024-01-01', 'ProductA'), (1, '2024-02-01', 'ProductB')
+// Result: (1, '2024-01-01', 'ProductA')
+```
+
+### first_value_ignore_nulls
+
+Selects the first non-null value in a data set.
+
+- **Supported Data Types**: All data types
+- **Behavior**: Keeps the first received non-null value, ignores all
subsequent values
+- **Null Handling**: Null values are ignored until a non-null value is received
+
+**Example:**
+```java
+Schema schema = Schema.newBuilder()
+ .column("id", DataTypes.BIGINT())
+ .column("email", DataTypes.STRING(), AggFunction.FIRST_VALUE_IGNORE_NULLS)
+ .column("verified_at", DataTypes.TIMESTAMP(3),
AggFunction.FIRST_VALUE_IGNORE_NULLS)
+ .primaryKey("id")
+ .build();
+
+TableDescriptor.builder()
+ .schema(schema)
+ .property("table.merge-engine", "aggregation")
+ .build();
+
+// Input: (1, null, null), (1, '[email protected]', '2024-01-01 10:00:00'), (1,
'[email protected]', '2024-01-02 10:00:00')
+// Result: (1, '[email protected]', '2024-01-01 10:00:00')
+```
+
+### listagg
+
+Concatenates multiple string values into a single string with a delimiter.
+
+- **Supported Data Types**: STRING, CHAR
+- **Behavior**: Concatenates values using the specified delimiter
+- **Null Handling**: Null values are skipped
+- **Configuration**: Use
`'table.merge-engine.aggregate.<field-name>.listagg-delimiter'` to specify a
custom delimiter (default is comma `,`)
+
+**Example:**
+```java
+Schema schema = Schema.newBuilder()
+ .column("id", DataTypes.BIGINT())
+ .column("tags", DataTypes.STRING(), AggFunction.LISTAGG)
+ .primaryKey("id")
+ .build();
+
+TableDescriptor.builder()
+ .schema(schema)
+ .property("table.merge-engine", "aggregation")
+ .property("table.merge-engine.aggregate.tags.listagg-delimiter", ";")
+ .build();
+
+// Input: (1, 'developer'), (1, 'java'), (1, 'flink')
+// Result: (1, 'developer;java;flink')
+```
+
+### string_agg
+
+Alias for `listagg`. Concatenates multiple string values into a single string
with a delimiter.
+
+- **Supported Data Types**: STRING, CHAR
+- **Behavior**: Same as `listagg` - concatenates values using the specified
delimiter
+- **Null Handling**: Null values are skipped
+- **Configuration**: Use
`'table.merge-engine.aggregate.<field-name>.listagg-delimiter'` to specify a
custom delimiter (default is comma `,`)
+
+**Example:**
+```java
+Schema schema = Schema.newBuilder()
+ .column("id", DataTypes.BIGINT())
+ .column("tags", DataTypes.STRING(), AggFunction.STRING_AGG)
+ .primaryKey("id")
+ .build();
+
+TableDescriptor.builder()
+ .schema(schema)
+ .property("table.merge-engine", "aggregation")
+ .property("table.merge-engine.aggregate.tags.listagg-delimiter", ";")
+ .build();
+
+// Input: (1, 'developer'), (1, 'java'), (1, 'flink')
+// Result: (1, 'developer;java;flink')
+```
+
+### bool_and
+
+Evaluates whether all boolean values in a set are true (logical AND).
+
+- **Supported Data Types**: BOOLEAN
+- **Behavior**: Returns true only if all values are true
+- **Null Handling**: Null values are ignored
+
+**Example:**
+```java
+Schema schema = Schema.newBuilder()
+ .column("id", DataTypes.BIGINT())
+ .column("has_all_permissions", DataTypes.BOOLEAN(), AggFunction.BOOL_AND)
+ .primaryKey("id")
+ .build();
+
+TableDescriptor.builder()
+ .schema(schema)
+ .property("table.merge-engine", "aggregation")
+ .build();
+
+// Input: (1, true), (1, true), (1, false)
+// Result: (1, false) -- Not all values are true
+```
+
+### bool_or
+
+Checks if at least one boolean value in a set is true (logical OR).
+
+- **Supported Data Types**: BOOLEAN
+- **Behavior**: Returns true if any value is true
+- **Null Handling**: Null values are ignored
+
+**Example:**
+```java
+Schema schema = Schema.newBuilder()
+ .column("id", DataTypes.BIGINT())
+ .column("has_any_alert", DataTypes.BOOLEAN(), AggFunction.BOOL_OR)
+ .primaryKey("id")
+ .build();
+
+TableDescriptor.builder()
+ .schema(schema)
+ .property("table.merge-engine", "aggregation")
+ .build();
+
+// Input: (1, false), (1, false), (1, true)
+// Result: (1, true) -- At least one value is true
+```
+
+## Advanced Configuration
+
+### Default Aggregate Function
+
+You can set a default aggregate function for all non-primary key fields that
don't have an explicitly specified function:
+
+```java
+Schema schema = Schema.newBuilder()
+ .column("id", DataTypes.BIGINT())
+ .column("col1", DataTypes.STRING()) // Defaults to LAST_VALUE_IGNORE_NULLS
+ .column("col2", DataTypes.BIGINT(), AggFunction.SUM) // Explicitly set to
SUM
+ .primaryKey("id")
+ .build();
+
+TableDescriptor.builder()
+ .schema(schema)
+ .property("table.merge-engine", "aggregation")
+ .build();
+```
+
+In this example:
+- `col2` uses `sum` aggregation (explicitly specified)
+- `col1` uses `last_value_ignore_nulls` as the default
+
+### Partial Update with Aggregation
+
+The aggregation merge engine supports partial updates through the UpsertWriter
API. When performing a partial update:
+
+- **Target columns**: These columns will be aggregated according to their
configured aggregate functions
+- **Non-target columns**: These columns will retain their existing values from
the old row
+
+**Example:**
+
+```java
+Schema schema = Schema.newBuilder()
+ .column("id", DataTypes.BIGINT())
+ .column("count1", DataTypes.BIGINT(), AggFunction.SUM)
+ .column("count2", DataTypes.BIGINT(), AggFunction.SUM)
+ .column("sum1", DataTypes.DOUBLE(), AggFunction.SUM)
+ .column("sum2", DataTypes.DOUBLE(), AggFunction.SUM)
+ .primaryKey("id")
+ .build();
+
+TableDescriptor tableDescriptor = TableDescriptor.builder()
+ .schema(schema)
+ .property("table.merge-engine", "aggregation")
+ .build();
+
+// Create partial update writer targeting only id, count1, and sum1
+int[] targetColumns = new int[]{0, 1, 3}; // id, count1, sum1
+UpsertWriter partialWriter = table.newUpsert()
+ .withPartialUpdate(targetColumns)
+ .createWriter();
+
+// When writing:
+// - count1 and sum1 will be aggregated with existing values
+// - count2 and sum2 will remain unchanged
+```
+
+**Use cases for partial aggregation**:
+1. **Independent metrics**: When different processes update different subsets
of metrics for the same key
+2. **Reduced data transfer**: Only send the columns that need to be updated
+3. **Flexible pipelines**: Different data sources can contribute to different
aggregated fields
+
+### Delete Behavior
+
+The aggregation merge engine provides limited support for delete operations.
You can configure the behavior using the `'table.agg.remove-record-on-delete'`
option:
+
+```java
+TableDescriptor.builder()
+ .schema(schema)
+ .property("table.merge-engine", "aggregation")
+ .property("table.agg.remove-record-on-delete", "true") // Default is false
+ .build();
+```
+
+**Configuration options**:
+- **`'table.agg.remove-record-on-delete' = 'false'`** (default): Delete
operations will cause an error
+- **`'table.agg.remove-record-on-delete' = 'true'`**: Delete operations will
remove the entire record from the table
+
+:::note
+**Current Limitation**: The aggregation merge engine does not support
retraction semantics (e.g., subtracting from a sum, reverting a max). Delete
operations can only remove the entire record or be rejected.
+
+Future versions may support fine-grained retraction by enhancing the protocol
to carry row data with delete operations.
+:::
+
+## Performance Considerations
+
+1. **Choose Appropriate Aggregate Functions**: Select functions that match
your use case to avoid unnecessary computations
+2. **Primary Key Design**: Use appropriate primary keys to ensure proper
grouping of aggregated data
+3. **Null Handling**: Be aware of how each function handles null values to
avoid unexpected results
+4. **Delete Handling**: If you need to handle delete operations, be aware that
enabling `'table.agg.remove-record-on-delete' = 'true'` will remove entire
records rather than retracting aggregated values
+
+## Limitations
+
+:::warning Critical Limitations
+When using the `aggregation` merge engine, be aware of the following critical
limitations:
+
+### 1. Exactly-Once Semantics
+
+**Fluss engine does not natively support transactional writes, and therefore
does not directly support Exactly-Once semantics at the storage layer.**
+
+Exactly-Once semantics should be achieved through integration with compute
engines (e.g., Flink, Spark). For example, after failover, undo operations can
be generated for invalid writes to achieve rollback.
+
+For detailed information about Exactly-Once implementation, please refer to:
[FIP-21: Aggregation Merge
Engine](https://cwiki.apache.org/confluence/display/FLUSS/FIP-21%3A+Aggregation+Merge+Engine)
+
+### 2. Delete Operations
+
+By default, delete operations will cause errors:
+- You must set `'table.agg.remove-record-on-delete' = 'true'` if you need to
handle delete operations
+- This configuration will remove the entire aggregated record, not reverse
individual aggregations
+
+### 3. Data Type Restrictions
+
+Each aggregate function supports specific data types (see function
documentation above)
+:::
Review Comment:
I don’t consider this a true limitation—after all, aggregate functions in
compute engines (like Flink or Spark) also don’t support all data types
universally. I suggest we **remove this item** from the list of limitations.
Separately, we should **explicitly document the limitation around retraction
messages** (i.e., handling of `UPDATE_BEFORE` messages). It’s important to
clarify which aggregate functions support retraction and which do not, as this
directly impacts correctness in streaming workloads.
I think we can also add the **Retraction Handling** to each aggregate
function documentation. And have a brief description in the "Limitations"
section.
For reference, Paimon provides a clear treatment of this topic here:
https://paimon.apache.org/docs/master/primary-key-table/merge-engine/aggregation/#retraction
##########
website/docs/table-design/table-types/pk-table/merge-engines/aggregation.md:
##########
@@ -0,0 +1,534 @@
+---
+sidebar_label: Aggregation
+title: Aggregation Merge Engine
+sidebar_position: 4
+---
+
+# Aggregation Merge Engine
+
+## Overview
+
+The **Aggregation Merge Engine** is designed for scenarios where users only
care about aggregated results rather than individual records. It aggregates
each value field with the latest data one by one under the same primary key
according to the specified aggregate function.
+
+Each field not part of the primary keys can be assigned an aggregate function
using the Schema API (recommended) or connector options
(`'fields.<field-name>.agg'`). If no function is specified for a field, it will
use `last_value_ignore_nulls` aggregation as the default behavior.
+
+This merge engine is useful for real-time aggregation scenarios such as:
+- Computing running totals and statistics
+- Maintaining counters and metrics
+- Tracking maximum/minimum values over time
+- Building real-time dashboards and analytics
+
+## Configuration
+
+To enable the aggregation merge engine, set the following table property:
+
+```
+'table.merge-engine' = 'aggregation'
+```
+
+Then specify the aggregate function for each non-primary key field using
connector options:
+
+```
+'fields.<field-name>.agg' = '<function-name>'
+```
+
+**Note**: The recommended way is to use Schema API (see section "API Usage"
below). The connector option is provided as an alternative for
connector-specific scenarios.
+
+## API Usage
+
+### Creating a Table with Aggregation
+
+```java
+import org.apache.fluss.client.Connection;
+import org.apache.fluss.client.admin.Admin;
+import org.apache.fluss.client.table.Table;
+import org.apache.fluss.metadata.Schema;
+import org.apache.fluss.metadata.TableDescriptor;
+import org.apache.fluss.metadata.TablePath;
+import org.apache.fluss.types.DataTypes;
+
+// Create connection
+Connection conn = Connection.create(config);
+Admin admin = conn.getAdmin();
+
+// Define schema with aggregation functions (recommended way)
+import org.apache.fluss.metadata.AggFunction;
+
+Schema schema = Schema.newBuilder()
+ .column("product_id", DataTypes.BIGINT())
+ .column("price", DataTypes.DOUBLE(), AggFunction.MAX)
+ .column("sales", DataTypes.BIGINT(), AggFunction.SUM)
+ .column("last_update_time", DataTypes.TIMESTAMP(3)) // Defaults to
LAST_VALUE_IGNORE_NULLS
+ .primaryKey("product_id")
+ .build();
+
+// Create table with aggregation merge engine
+TableDescriptor tableDescriptor = TableDescriptor.builder()
+ .schema(schema)
+ .property("table.merge-engine", "aggregation")
+ .build();
+
+TablePath tablePath = TablePath.of("my_database", "product_stats");
+admin.createTable(tablePath, tableDescriptor, false).get();
+```
+
+### Writing Data
+
+```java
+// Get table
+Table table = conn.getTable(tablePath);
+
+// Create upsert writer
+UpsertWriter writer = table.newUpsert().createWriter();
+
+// Write data - these will be aggregated
+writer.upsert(row(1L, 23.0, 15L, timestamp1));
+writer.upsert(row(1L, 30.2, 20L, timestamp2)); // Same primary key - triggers
aggregation
+
+writer.flush();
+```
+
+**Result after aggregation:**
+- `product_id`: 1
+- `price`: 30.2 (max of 23.0 and 30.2)
+- `sales`: 35 (sum of 15 and 20)
+- `last_update_time`: timestamp2 (last non-null value)
+
+## Supported Aggregate Functions
+
+Fluss currently supports the following aggregate functions:
+
+### sum
+
+Aggregates values by computing the sum across multiple rows.
+
+- **Supported Data Types**: TINYINT, SMALLINT, INT, BIGINT, FLOAT, DOUBLE,
DECIMAL
+- **Behavior**: Adds incoming values to the accumulator
+- **Null Handling**: Null values are ignored
+
+**Example:**
+```java
+Schema schema = Schema.newBuilder()
+ .column("id", DataTypes.BIGINT())
+ .column("amount", DataTypes.DECIMAL(10, 2), AggFunction.SUM)
+ .primaryKey("id")
+ .build();
+
+TableDescriptor.builder()
+ .schema(schema)
+ .property("table.merge-engine", "aggregation")
+ .build();
+
+// Input: (1, 100.50), (1, 200.75)
+// Result: (1, 301.25)
+```
+
+### product
+
+Computes the product of values across multiple rows.
+
+- **Supported Data Types**: TINYINT, SMALLINT, INT, BIGINT, FLOAT, DOUBLE,
DECIMAL
+- **Behavior**: Multiplies incoming values with the accumulator
+- **Null Handling**: Null values are ignored
+
+**Example:**
+```java
+Schema schema = Schema.newBuilder()
+ .column("id", DataTypes.BIGINT())
+ .column("discount_factor", DataTypes.DOUBLE(), AggFunction.PRODUCT)
+ .primaryKey("id")
+ .build();
+
+TableDescriptor.builder()
+ .schema(schema)
+ .property("table.merge-engine", "aggregation")
+ .build();
+
+// Input: (1, 0.9), (1, 0.8)
+// Result: (1, 0.72) -- 90% * 80% = 72%
+```
+
+### max
+
+Identifies and retains the maximum value.
+
+- **Supported Data Types**: CHAR, STRING, TINYINT, SMALLINT, INT, BIGINT,
FLOAT, DOUBLE, DECIMAL, DATE, TIME, TIMESTAMP, TIMESTAMP_LTZ
+- **Behavior**: Keeps the larger value between accumulator and incoming value
+- **Null Handling**: Null values are ignored
+
+**Example:**
+```java
+Schema schema = Schema.newBuilder()
+ .column("id", DataTypes.BIGINT())
+ .column("max_temperature", DataTypes.DOUBLE(), AggFunction.MAX)
+ .column("max_reading_time", DataTypes.TIMESTAMP(3), AggFunction.MAX)
+ .primaryKey("id")
+ .build();
+
+TableDescriptor.builder()
+ .schema(schema)
+ .property("table.merge-engine", "aggregation")
+ .build();
+
+// Input: (1, 25.5, '2024-01-01 10:00:00'), (1, 28.3, '2024-01-01 11:00:00')
+// Result: (1, 28.3, '2024-01-01 11:00:00')
+```
+
+### min
+
+Identifies and retains the minimum value.
+
+- **Supported Data Types**: CHAR, STRING, TINYINT, SMALLINT, INT, BIGINT,
FLOAT, DOUBLE, DECIMAL, DATE, TIME, TIMESTAMP, TIMESTAMP_LTZ
+- **Behavior**: Keeps the smaller value between accumulator and incoming value
+- **Null Handling**: Null values are ignored
+
+**Example:**
+```java
+Schema schema = Schema.newBuilder()
+ .column("id", DataTypes.BIGINT())
+ .column("lowest_price", DataTypes.DECIMAL(10, 2), AggFunction.MIN)
+ .primaryKey("id")
+ .build();
+
+TableDescriptor.builder()
+ .schema(schema)
+ .property("table.merge-engine", "aggregation")
+ .build();
+
+// Input: (1, 99.99), (1, 79.99), (1, 89.99)
+// Result: (1, 79.99)
+```
+
+### last_value
+
+Replaces the previous value with the most recently received value.
+
+- **Supported Data Types**: All data types
+- **Behavior**: Always uses the latest incoming value
+- **Null Handling**: Null values will overwrite previous values
+
+**Example:**
+```java
+Schema schema = Schema.newBuilder()
+ .column("id", DataTypes.BIGINT())
+ .column("status", DataTypes.STRING(), AggFunction.LAST_VALUE)
+ .column("last_login", DataTypes.TIMESTAMP(3), AggFunction.LAST_VALUE)
+ .primaryKey("id")
+ .build();
+
+TableDescriptor.builder()
+ .schema(schema)
+ .property("table.merge-engine", "aggregation")
+ .build();
+
+// Input: (1, 'online', '2024-01-01 10:00:00'), (1, 'offline', '2024-01-01
11:00:00')
+// Result: (1, 'offline', '2024-01-01 11:00:00')
Review Comment:
it's better to insert a `null` for one of the `last_value` column to
demostrate the null handling.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]