Hi Yanng,

Thanks for the great proposal, it’s very detailed and covers all
aspects comprehensively.

I have the following comments on the design:

**(1) Handling failover before the first checkpoint is completed**
If a job starts, writes some data, but failover before completing its
first checkpoint, the undo log mechanism won’t be triggered, leading
to duplicate data. To address this, I think we may need to rely on
Fluss to store the initial offset state.

One possible solution:
① Only the Coordinator can accept `AcquireColumnLockRequest`. The
request can carry offset information, and the Coordinator persists the
`owner-id`, `columns`, and `offsets` to ZooKeeper.
② At job startup, Sink Task-0 fetches a full snapshot of the table’s
offsets and registers a column lock with the Fluss Coordinator.
③ Upon failover, the sink checks whether it’s recovering from state or
starting statelessly. In the stateless case, it retrieves the
persisted offsets for the `owner-id` from the Coordinator/ZooKeeper,
compares them with the latest cluster offsets, and then reconstructs
the redo log accordingly.

**Tip**: We don’t need `schema_id` in `AcquireColumnLockRequest` if we
use column ids instead of column indexes.

**(2) Handling Dynamic Partitioning**
When dynamic partitioning is enabled on a table, a similar issue
arises: new partitions may be created and written to, but if failover
occurs before the first checkpoint, duplicate data can appear in those
new partitions.

To handle this, during failover recovery from state, we must retrieve
**all partitions and their corresponding offsets**. If a partition’s
offset is missing from the state, we should start consuming its redo
log from offset `0`.

**(3) Improvements to lock owner ID and TTL parameters**
The current parameters `client.writer.lock-owner-id` and
`client.column-lock.default-ttl` can be simplified and unified as:
- `client.column-lock.owner-id`
- `client.column-lock.ttl`

Moreover, these parameters should work **by default without explicit
configuration**. For example, we can generate a default `owner-id`
from the Flink Job ID + column index:
- The Job ID ensures a new ID for every fresh job submission.
- During failover restarts, the Job ID remains unchanged.

Additionally, we should **checkpoint the `owner-id` into the sink
state**, so it remains consistent during failover or job version
upgrades.

**(4) Connector Options & TableDescriptor Configuration API**
Currently, users must configure aggregate columns via connector
options in the Java client. However, we consider aggregate functions
part of the **schema definition**, similar to how StarRocks [1] and
ClickHouse [2] define aggregation directly in column DDL (e.g., `pv
BIGINT SUM`).

The need for options today stems from Flink SQL’s lack of support for
such DDL syntax. But for future engine integrations, we’d prefer
native DDL-based way.

Therefore:

Add a method in the `Schema`/`SchemaBuilder` layer:

```java
aggColumn(String columnName, DataType dataType, AggFunction aggFunction)
```
The `AggFunction` should be an Enum type. This method is similar to
how we added `incrementColumn`.

Then rename `table.fields.total_orders.aggregate-function` to the
connector option `fields.total_orders.agg`. We can shorten
`aggregate-function` to `agg` here. The connector option is translated
into the schema builder.


**(5) Default value for `table.column-lock.enabled`**
Currently, this is `false` by default. However, if the aggregate merge
engine **requires** column locking, Fluss coordinator can **enable it
automatically** when creating an agg merge engine table, unless the
user explicitly sets it to `false`.

**(6) Aggregate functions**
The behavior of `last_value` ignoring `NULL`s is standardized via the
`IGNORE NULLS` clause (e.g., in Databricks [3] and ClickHouse [4]).
Thus, I suggest renaming:
- `last_non_null_value` → `last_value_ignore_nulls`
- `first_non_null_value` → `first_value_ignore_nulls`

Additionally, we should also introduce:
- `string_agg` (alias for `listagg`) [5]
- `array_agg` (returns `ARRAY<T>`) [6]

**(7) Connection Interface for Recovery Mode**
Exposing a "recovery mode" at the `Connection` level leaks too much
internal complexity. The `Connection` is the primary API entry point,
while redo log handling is an advanced feature rarely used directly by
end users.

Instead, I suggest embedding the override mode directly in
`PutKvRequest` via a new field:
```proto
optional int32 agg_mode = 6; // null (default): replace, 0: replace,
1: accumulate, 2: merge_state
```

In the future, `merge_state` mode could allow the sink to perform
**local aggregation** and send the aggregated state to the server for
direct merging to reduce RocksDB write amplification. This would be a
valuable production optimization, analogous to Flink’s mini-batch [7].

**(8) If aggregate-function not configured, default to last_non_null_value
I suggest **requiring users to explicitly configure an aggregate
function**, as this makes semantics clearer. We can consider adding a
fallback behavior later **only if** user feedback shows it’s truly
needed.

Best,
Jark Wu

**References**
[1] 
https://docs.starrocks.io/docs/table_design/table_types/aggregate_table/#create-a-table
[2] 
https://clickhouse.com/docs/engines/table-engines/mergetree-family/aggregatingmergetree
[3] https://docs.databricks.com/aws/en/sql/language-manual/functions/last_value
[4] 
https://clickhouse.com/docs/engines/table-engines/mergetree-family/aggregatingmergetree
[5] https://docs.databricks.com/aws/en/sql/language-manual/functions/string_agg
[6] https://docs.databricks.com/aws/en/sql/language-manual/functions/array_agg
[7] https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/tuning/

On Mon, 8 Dec 2025 at 15:30, Yang Wang <[email protected]> wrote:
>
> Hi Cheng,
> Thanks for the thoughtful feedback and for bringing up the RocksDB 2PC
> approach.
> You've identified the core challenge precisely: data visibility vs.
> real-time processing. This is exactly why we chose the Undo Recovery
> mechanism over transaction-based approaches in the proposal.
>
>
>
> *Key Considerations:1. Real-time Visibility Conflict*As you mentioned,
> RocksDB 2PC would require delayed visibility until transaction commit. For
> Fluss's positioning as a real-time streaming storage, this conflicts with
> our fundamental requirement that writes should be immediately queryable. In
> typical scenarios (e.g., real-time dashboards), users expect second-level
> updates, not waiting for Flink checkpoint completion (which could be tens
> of seconds).
>
> *2. Already Evaluated and Rejected*We actually evaluated transaction
> mechanisms in the FIP design phase. From the "Rejected Alternatives"
> section:
> > Use Transaction Mechanism to Implement Exactly-Once
> >
> > Disadvantages: Extremely high implementation complexity, requires
> refactoring Fluss's write path, high performance overhead (requires delayed
> visibility, increased commit overhead), conflicts with Fluss's real-time
> visibility design philosophy
> >
> > Rejection Reason: Cost too high, inconsistent with Fluss's real-time
> streaming storage positioning
> (See FIP Section: "Rejected Alternatives")
>
> *3. Additional Complexity with RocksDB 2PC*Beyond visibility issues:
>
>    - Distributed coordination: Requires a global transaction coordinator
>    across multiple TabletServers
>    - Flink checkpoint alignment: How to coordinate RocksDB commit with
>    asynchronous Flink checkpoints?
>    - Multi-job concurrency: Column-level partial updates would require
>    complex transaction isolation coordination
>    - Performance overhead: Prepare/commit overhead exists for every write,
>    even in normal cases
>
>
> *4. Why Undo Recovery Fits Better*Our approach optimizes for the common
> case:
>
>    - Normal writes: Zero transaction overhead, immediate visibility
>    - Failover (rare): Pay the cost of undo operations only when needed
>    - Lightweight: Leverages existing Changelog capability, no global
>    coordinator needed
>    - Localized: Each bucket handles recovery independently via offset
>    comparison
>
> *Summary*
> While RocksDB 2PC is theoretically cleaner from a database perspective, it
> introduces unacceptable trade-offs for Fluss's real-time streaming use
> cases. The Undo Recovery approach better aligns with our "optimize for the
> common path" philosophy and maintains Fluss's real-time characteristics.
> Would love to discuss further if you have additional thoughts!
>
> Best regards,
> Yang

Reply via email to