Hi Hongshun, Thanks for your feedback. Let me answer your questions:
1. The Partial Update feature must allow concurrent update streams against the same table. Therefore, our locks are defined based on column-level mutual exclusion: as long as two streams do not update the same column, there will be no issues that could break aggregation and undo consistency. 2. The key to avoiding deadlocks is that locks must be acquired following the same partial order, via the same sequence of arbitration nodes. We have implemented this as much as possible in the design of the column-locking framework. In addition, a fallback mechanism is a lock TTL: even in corner cases where a lock lease is not correctly renewed, the lock will automatically expire due to the TTL. Finally, since this FIP is currently too complex, I plan to remove the sections related to column locks from this FIP and split column locks into a separate sub-FIP for submission. Best, Yang Hongshun Wang <[email protected]> 于2025年12月22日周一 10:43写道: > Hi Yang, > Thanks for your significant job. I am interested in the Column Lock Design > and would like to ask some questions: > 1. The lock granularity is (tableId, partitionId, columns). Does this mean > that for a single (tableId, partitionId), there could be multiple locks for > different columns? If not, it seems that (tableId, partitionId) would > suffice as the lock key. > > 2. The ClientColumnLockManager (rather than the coordinator server) holds > the lock. What happens if a deadlock occurs? For example: Client 1 holds > the lock for tablet server 1 while Client 2 also holds the lock for tablet > server 2. Then both ask for another lock. > > Best, > Hongshun > > On Fri, Dec 19, 2025 at 11:00 AM Yang Wang <[email protected]> > wrote: > > > Hi Keith Lee, > > > > Thanks for your feedback—this is a good question. I believe RowMerger > mode > > can cover most common cases. The main difference across aggregate > functions > > is the underlying data type of the accumulator state. In some cases, we > may > > need a more complex state type, such as ArrayType or MapType. > > > > For example, for percentile aggregation, we could introduce a “syntactic > > sugar” data type like Histogram (like prometheus). It could be read from > > and written to as an INT/BIGINT, while its underlying storage is an > > ArrayType. We could then implement a dedicated aggregate function on top > of > > it. > > > > Best regards, > > Yang > > > > Keith Lee <[email protected]> 于2025年12月19日周五 04:48写道: > > > > > Hi Yang, > > > > > > Thank you for the well thought through proposal. I have mainly one > > question > > > > > > 1. Have you considered adding percentile aggregations? e.g. median, p99 > > or > > > p1. I find that these can be tremendously useful to get a good > > > representation of the dataset without being affected by outliers e.g. > > > min/max. Granted, the current `RowMerger` interface allows for merging > of > > > individual rows and calculating percentile in streaming manner needs > more > > > information than current and last rows; Considering what it would take > to > > > implement percentile aggregations may reveal solutions more amenable to > > > implement more types of useful aggregations in the future. > > > > > > Best regards > > > Keith Lee > > > > > > > > > On Thu, Dec 18, 2025 at 12:14 PM Yang Wang <[email protected]> > > > wrote: > > > > > > > Hi, Jark. > > > > Thanks for your comprehensive and valuable feedback. This has made > the > > > > FIP proposal > > > > more complete. Let me respond to the comments one by one: > > > > > > > > - Comments (1) and (2) essentially target the same issue: for > > buckets > > > > without checkpoints, how do we determine which updates need to be > > > rolled > > > > back? Persisting state to Fluss is an excellent solution. It > allows > > us > > > > to > > > > distinguish whether an instance is starting for the first time or > > > > recovering from a failover without checkpoints. This is a very > > > important > > > > suggestion. I will add a handling plan for this corner case to the > > > FIP. > > > > - I strongly agree with suggestions (3), (4), (5), and (6). I will > > > > update the FIP accordingly. > > > > - Regarding suggestion (7), I have some concerns. In fact, our API > > > > design and implementation architecture influence each other. The > > core > > > > issue > > > > is that for the same bucket of the same table, we cannot allow the > > > send > > > > queue to contain mixed WriteBatches based on different agg_modes, > as > > > > this > > > > would lead to non-deterministic write results. To introduce > agg_mode > > > > cleanly, we would need significant refactoring of both the > > upper-layer > > > > write API and the batching/aggregation sending architecture. This > > > > complexity may be unnecessary at the moment. Designing a > > > “Recovery-mode” > > > > connection is actually a compromise to introduce the minimal > > > complexity > > > > while still providing correct semantics. Perhaps we can discuss > this > > > > further. > > > > - Regarding suggestion (8), considering that users may work with > > very > > > > wide tables, they might be forced to add extra configuration items > > for > > > > many > > > > columns that have no special aggregation needs, which would hurt > > user > > > > experience and bloat configuration. Apache Paimon defaults to > using > > > the > > > > last_non_null_value aggregation function for unspecified columns, > > and > > > I > > > > believe most users may already be accustomed to this behavior. It > > > might > > > > be > > > > better for us to stay consistent with Paimon. > > > > > > > > Best regards, > > > > Yang > > > > > > > > Jark Wu <[email protected]> 于2025年12月14日周日 23:37写道: > > > > > > > > > Hi Yanng, > > > > > > > > > > Thanks for the great proposal, it’s very detailed and covers all > > > > > aspects comprehensively. > > > > > > > > > > I have the following comments on the design: > > > > > > > > > > **(1) Handling failover before the first checkpoint is completed** > > > > > If a job starts, writes some data, but failover before completing > its > > > > > first checkpoint, the undo log mechanism won’t be triggered, > leading > > > > > to duplicate data. To address this, I think we may need to rely on > > > > > Fluss to store the initial offset state. > > > > > > > > > > One possible solution: > > > > > ① Only the Coordinator can accept `AcquireColumnLockRequest`. The > > > > > request can carry offset information, and the Coordinator persists > > the > > > > > `owner-id`, `columns`, and `offsets` to ZooKeeper. > > > > > ② At job startup, Sink Task-0 fetches a full snapshot of the > table’s > > > > > offsets and registers a column lock with the Fluss Coordinator. > > > > > ③ Upon failover, the sink checks whether it’s recovering from state > > or > > > > > starting statelessly. In the stateless case, it retrieves the > > > > > persisted offsets for the `owner-id` from the > Coordinator/ZooKeeper, > > > > > compares them with the latest cluster offsets, and then > reconstructs > > > > > the redo log accordingly. > > > > > > > > > > **Tip**: We don’t need `schema_id` in `AcquireColumnLockRequest` if > > we > > > > > use column ids instead of column indexes. > > > > > > > > > > **(2) Handling Dynamic Partitioning** > > > > > When dynamic partitioning is enabled on a table, a similar issue > > > > > arises: new partitions may be created and written to, but if > failover > > > > > occurs before the first checkpoint, duplicate data can appear in > > those > > > > > new partitions. > > > > > > > > > > To handle this, during failover recovery from state, we must > retrieve > > > > > **all partitions and their corresponding offsets**. If a > partition’s > > > > > offset is missing from the state, we should start consuming its > redo > > > > > log from offset `0`. > > > > > > > > > > **(3) Improvements to lock owner ID and TTL parameters** > > > > > The current parameters `client.writer.lock-owner-id` and > > > > > `client.column-lock.default-ttl` can be simplified and unified as: > > > > > - `client.column-lock.owner-id` > > > > > - `client.column-lock.ttl` > > > > > > > > > > Moreover, these parameters should work **by default without > explicit > > > > > configuration**. For example, we can generate a default `owner-id` > > > > > from the Flink Job ID + column index: > > > > > - The Job ID ensures a new ID for every fresh job submission. > > > > > - During failover restarts, the Job ID remains unchanged. > > > > > > > > > > Additionally, we should **checkpoint the `owner-id` into the sink > > > > > state**, so it remains consistent during failover or job version > > > > > upgrades. > > > > > > > > > > **(4) Connector Options & TableDescriptor Configuration API** > > > > > Currently, users must configure aggregate columns via connector > > > > > options in the Java client. However, we consider aggregate > functions > > > > > part of the **schema definition**, similar to how StarRocks [1] and > > > > > ClickHouse [2] define aggregation directly in column DDL (e.g., `pv > > > > > BIGINT SUM`). > > > > > > > > > > The need for options today stems from Flink SQL’s lack of support > for > > > > > such DDL syntax. But for future engine integrations, we’d prefer > > > > > native DDL-based way. > > > > > > > > > > Therefore: > > > > > > > > > > Add a method in the `Schema`/`SchemaBuilder` layer: > > > > > > > > > > ```java > > > > > aggColumn(String columnName, DataType dataType, AggFunction > > > aggFunction) > > > > > ``` > > > > > The `AggFunction` should be an Enum type. This method is similar to > > > > > how we added `incrementColumn`. > > > > > > > > > > Then rename `table.fields.total_orders.aggregate-function` to the > > > > > connector option `fields.total_orders.agg`. We can shorten > > > > > `aggregate-function` to `agg` here. The connector option is > > translated > > > > > into the schema builder. > > > > > > > > > > > > > > > **(5) Default value for `table.column-lock.enabled`** > > > > > Currently, this is `false` by default. However, if the aggregate > > merge > > > > > engine **requires** column locking, Fluss coordinator can **enable > it > > > > > automatically** when creating an agg merge engine table, unless the > > > > > user explicitly sets it to `false`. > > > > > > > > > > **(6) Aggregate functions** > > > > > The behavior of `last_value` ignoring `NULL`s is standardized via > the > > > > > `IGNORE NULLS` clause (e.g., in Databricks [3] and ClickHouse [4]). > > > > > Thus, I suggest renaming: > > > > > - `last_non_null_value` → `last_value_ignore_nulls` > > > > > - `first_non_null_value` → `first_value_ignore_nulls` > > > > > > > > > > Additionally, we should also introduce: > > > > > - `string_agg` (alias for `listagg`) [5] > > > > > - `array_agg` (returns `ARRAY<T>`) [6] > > > > > > > > > > **(7) Connection Interface for Recovery Mode** > > > > > Exposing a "recovery mode" at the `Connection` level leaks too much > > > > > internal complexity. The `Connection` is the primary API entry > point, > > > > > while redo log handling is an advanced feature rarely used directly > > by > > > > > end users. > > > > > > > > > > Instead, I suggest embedding the override mode directly in > > > > > `PutKvRequest` via a new field: > > > > > ```proto > > > > > optional int32 agg_mode = 6; // null (default): replace, 0: > replace, > > > > > 1: accumulate, 2: merge_state > > > > > ``` > > > > > > > > > > In the future, `merge_state` mode could allow the sink to perform > > > > > **local aggregation** and send the aggregated state to the server > for > > > > > direct merging to reduce RocksDB write amplification. This would > be a > > > > > valuable production optimization, analogous to Flink’s mini-batch > > [7]. > > > > > > > > > > **(8) If aggregate-function not configured, default to > > > > last_non_null_value > > > > > I suggest **requiring users to explicitly configure an aggregate > > > > > function**, as this makes semantics clearer. We can consider > adding a > > > > > fallback behavior later **only if** user feedback shows it’s truly > > > > > needed. > > > > > > > > > > Best, > > > > > Jark Wu > > > > > > > > > > **References** > > > > > [1] > > > > > > > > > > > > > > > https://docs.starrocks.io/docs/table_design/table_types/aggregate_table/#create-a-table > > > > > [2] > > > > > > > > > > > > > > > https://clickhouse.com/docs/engines/table-engines/mergetree-family/aggregatingmergetree > > > > > [3] > > > > > > > > > > > > > > > https://docs.databricks.com/aws/en/sql/language-manual/functions/last_value > > > > > [4] > > > > > > > > > > > > > > > https://clickhouse.com/docs/engines/table-engines/mergetree-family/aggregatingmergetree > > > > > [5] > > > > > > > > > > > > > > > https://docs.databricks.com/aws/en/sql/language-manual/functions/string_agg > > > > > [6] > > > > > > > > > > > > > > > https://docs.databricks.com/aws/en/sql/language-manual/functions/array_agg > > > > > [7] > > > > > > > > > > > > > > > https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/tuning/ > > > > > > > > > > On Mon, 8 Dec 2025 at 15:30, Yang Wang <[email protected]> > > > > wrote: > > > > > > > > > > > > Hi Cheng, > > > > > > Thanks for the thoughtful feedback and for bringing up the > RocksDB > > > 2PC > > > > > > approach. > > > > > > You've identified the core challenge precisely: data visibility > vs. > > > > > > real-time processing. This is exactly why we chose the Undo > > Recovery > > > > > > mechanism over transaction-based approaches in the proposal. > > > > > > > > > > > > > > > > > > > > > > > > *Key Considerations:1. Real-time Visibility Conflict*As you > > > mentioned, > > > > > > RocksDB 2PC would require delayed visibility until transaction > > > commit. > > > > > For > > > > > > Fluss's positioning as a real-time streaming storage, this > > conflicts > > > > with > > > > > > our fundamental requirement that writes should be immediately > > > > queryable. > > > > > In > > > > > > typical scenarios (e.g., real-time dashboards), users expect > > > > second-level > > > > > > updates, not waiting for Flink checkpoint completion (which could > > be > > > > tens > > > > > > of seconds). > > > > > > > > > > > > *2. Already Evaluated and Rejected*We actually evaluated > > transaction > > > > > > mechanisms in the FIP design phase. From the "Rejected > > Alternatives" > > > > > > section: > > > > > > > Use Transaction Mechanism to Implement Exactly-Once > > > > > > > > > > > > > > Disadvantages: Extremely high implementation complexity, > requires > > > > > > refactoring Fluss's write path, high performance overhead > (requires > > > > > delayed > > > > > > visibility, increased commit overhead), conflicts with Fluss's > > > > real-time > > > > > > visibility design philosophy > > > > > > > > > > > > > > Rejection Reason: Cost too high, inconsistent with Fluss's > > > real-time > > > > > > streaming storage positioning > > > > > > (See FIP Section: "Rejected Alternatives") > > > > > > > > > > > > *3. Additional Complexity with RocksDB 2PC*Beyond visibility > > issues: > > > > > > > > > > > > - Distributed coordination: Requires a global transaction > > > > coordinator > > > > > > across multiple TabletServers > > > > > > - Flink checkpoint alignment: How to coordinate RocksDB commit > > > with > > > > > > asynchronous Flink checkpoints? > > > > > > - Multi-job concurrency: Column-level partial updates would > > > require > > > > > > complex transaction isolation coordination > > > > > > - Performance overhead: Prepare/commit overhead exists for > every > > > > > write, > > > > > > even in normal cases > > > > > > > > > > > > > > > > > > *4. Why Undo Recovery Fits Better*Our approach optimizes for the > > > common > > > > > > case: > > > > > > > > > > > > - Normal writes: Zero transaction overhead, immediate > visibility > > > > > > - Failover (rare): Pay the cost of undo operations only when > > > needed > > > > > > - Lightweight: Leverages existing Changelog capability, no > > global > > > > > > coordinator needed > > > > > > - Localized: Each bucket handles recovery independently via > > offset > > > > > > comparison > > > > > > > > > > > > *Summary* > > > > > > While RocksDB 2PC is theoretically cleaner from a database > > > perspective, > > > > > it > > > > > > introduces unacceptable trade-offs for Fluss's real-time > streaming > > > use > > > > > > cases. The Undo Recovery approach better aligns with our > "optimize > > > for > > > > > the > > > > > > common path" philosophy and maintains Fluss's real-time > > > > characteristics. > > > > > > Would love to discuss further if you have additional thoughts! > > > > > > > > > > > > Best regards, > > > > > > Yang > > > > > > > > > > > > > > >
