Hi, Jark.
Thanks for your comprehensive and valuable feedback. This has made the
FIP proposal
more complete. Let me respond to the comments one by one:

   - Comments (1) and (2) essentially target the same issue: for buckets
   without checkpoints, how do we determine which updates need to be rolled
   back? Persisting state to Fluss is an excellent solution. It allows us to
   distinguish whether an instance is starting for the first time or
   recovering from a failover without checkpoints. This is a very important
   suggestion. I will add a handling plan for this corner case to the FIP.
   - I strongly agree with suggestions (3), (4), (5), and (6). I will
   update the FIP accordingly.
   - Regarding suggestion (7), I have some concerns. In fact, our API
   design and implementation architecture influence each other. The core issue
   is that for the same bucket of the same table, we cannot allow the send
   queue to contain mixed WriteBatches based on different agg_modes, as this
   would lead to non-deterministic write results. To introduce agg_mode
   cleanly, we would need significant refactoring of both the upper-layer
   write API and the batching/aggregation sending architecture. This
   complexity may be unnecessary at the moment. Designing a “Recovery-mode”
   connection is actually a compromise to introduce the minimal complexity
   while still providing correct semantics. Perhaps we can discuss this
   further.
   - Regarding suggestion (8), considering that users may work with very
   wide tables, they might be forced to add extra configuration items for many
   columns that have no special aggregation needs, which would hurt user
   experience and bloat configuration. Apache Paimon defaults to using the
   last_non_null_value aggregation function for unspecified columns, and I
   believe most users may already be accustomed to this behavior. It might be
   better for us to stay consistent with Paimon.

Best regards,
Yang

Jark Wu <[email protected]> 于2025年12月14日周日 23:37写道:

> Hi Yanng,
>
> Thanks for the great proposal, it’s very detailed and covers all
> aspects comprehensively.
>
> I have the following comments on the design:
>
> **(1) Handling failover before the first checkpoint is completed**
> If a job starts, writes some data, but failover before completing its
> first checkpoint, the undo log mechanism won’t be triggered, leading
> to duplicate data. To address this, I think we may need to rely on
> Fluss to store the initial offset state.
>
> One possible solution:
> ① Only the Coordinator can accept `AcquireColumnLockRequest`. The
> request can carry offset information, and the Coordinator persists the
> `owner-id`, `columns`, and `offsets` to ZooKeeper.
> ② At job startup, Sink Task-0 fetches a full snapshot of the table’s
> offsets and registers a column lock with the Fluss Coordinator.
> ③ Upon failover, the sink checks whether it’s recovering from state or
> starting statelessly. In the stateless case, it retrieves the
> persisted offsets for the `owner-id` from the Coordinator/ZooKeeper,
> compares them with the latest cluster offsets, and then reconstructs
> the redo log accordingly.
>
> **Tip**: We don’t need `schema_id` in `AcquireColumnLockRequest` if we
> use column ids instead of column indexes.
>
> **(2) Handling Dynamic Partitioning**
> When dynamic partitioning is enabled on a table, a similar issue
> arises: new partitions may be created and written to, but if failover
> occurs before the first checkpoint, duplicate data can appear in those
> new partitions.
>
> To handle this, during failover recovery from state, we must retrieve
> **all partitions and their corresponding offsets**. If a partition’s
> offset is missing from the state, we should start consuming its redo
> log from offset `0`.
>
> **(3) Improvements to lock owner ID and TTL parameters**
> The current parameters `client.writer.lock-owner-id` and
> `client.column-lock.default-ttl` can be simplified and unified as:
> - `client.column-lock.owner-id`
> - `client.column-lock.ttl`
>
> Moreover, these parameters should work **by default without explicit
> configuration**. For example, we can generate a default `owner-id`
> from the Flink Job ID + column index:
> - The Job ID ensures a new ID for every fresh job submission.
> - During failover restarts, the Job ID remains unchanged.
>
> Additionally, we should **checkpoint the `owner-id` into the sink
> state**, so it remains consistent during failover or job version
> upgrades.
>
> **(4) Connector Options & TableDescriptor Configuration API**
> Currently, users must configure aggregate columns via connector
> options in the Java client. However, we consider aggregate functions
> part of the **schema definition**, similar to how StarRocks [1] and
> ClickHouse [2] define aggregation directly in column DDL (e.g., `pv
> BIGINT SUM`).
>
> The need for options today stems from Flink SQL’s lack of support for
> such DDL syntax. But for future engine integrations, we’d prefer
> native DDL-based way.
>
> Therefore:
>
> Add a method in the `Schema`/`SchemaBuilder` layer:
>
> ```java
> aggColumn(String columnName, DataType dataType, AggFunction aggFunction)
> ```
> The `AggFunction` should be an Enum type. This method is similar to
> how we added `incrementColumn`.
>
> Then rename `table.fields.total_orders.aggregate-function` to the
> connector option `fields.total_orders.agg`. We can shorten
> `aggregate-function` to `agg` here. The connector option is translated
> into the schema builder.
>
>
> **(5) Default value for `table.column-lock.enabled`**
> Currently, this is `false` by default. However, if the aggregate merge
> engine **requires** column locking, Fluss coordinator can **enable it
> automatically** when creating an agg merge engine table, unless the
> user explicitly sets it to `false`.
>
> **(6) Aggregate functions**
> The behavior of `last_value` ignoring `NULL`s is standardized via the
> `IGNORE NULLS` clause (e.g., in Databricks [3] and ClickHouse [4]).
> Thus, I suggest renaming:
> - `last_non_null_value` → `last_value_ignore_nulls`
> - `first_non_null_value` → `first_value_ignore_nulls`
>
> Additionally, we should also introduce:
> - `string_agg` (alias for `listagg`) [5]
> - `array_agg` (returns `ARRAY<T>`) [6]
>
> **(7) Connection Interface for Recovery Mode**
> Exposing a "recovery mode" at the `Connection` level leaks too much
> internal complexity. The `Connection` is the primary API entry point,
> while redo log handling is an advanced feature rarely used directly by
> end users.
>
> Instead, I suggest embedding the override mode directly in
> `PutKvRequest` via a new field:
> ```proto
> optional int32 agg_mode = 6; // null (default): replace, 0: replace,
> 1: accumulate, 2: merge_state
> ```
>
> In the future, `merge_state` mode could allow the sink to perform
> **local aggregation** and send the aggregated state to the server for
> direct merging to reduce RocksDB write amplification. This would be a
> valuable production optimization, analogous to Flink’s mini-batch [7].
>
> **(8) If aggregate-function not configured, default to last_non_null_value
> I suggest **requiring users to explicitly configure an aggregate
> function**, as this makes semantics clearer. We can consider adding a
> fallback behavior later **only if** user feedback shows it’s truly
> needed.
>
> Best,
> Jark Wu
>
> **References**
> [1]
> https://docs.starrocks.io/docs/table_design/table_types/aggregate_table/#create-a-table
> [2]
> https://clickhouse.com/docs/engines/table-engines/mergetree-family/aggregatingmergetree
> [3]
> https://docs.databricks.com/aws/en/sql/language-manual/functions/last_value
> [4]
> https://clickhouse.com/docs/engines/table-engines/mergetree-family/aggregatingmergetree
> [5]
> https://docs.databricks.com/aws/en/sql/language-manual/functions/string_agg
> [6]
> https://docs.databricks.com/aws/en/sql/language-manual/functions/array_agg
> [7]
> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/tuning/
>
> On Mon, 8 Dec 2025 at 15:30, Yang Wang <[email protected]> wrote:
> >
> > Hi Cheng,
> > Thanks for the thoughtful feedback and for bringing up the RocksDB 2PC
> > approach.
> > You've identified the core challenge precisely: data visibility vs.
> > real-time processing. This is exactly why we chose the Undo Recovery
> > mechanism over transaction-based approaches in the proposal.
> >
> >
> >
> > *Key Considerations:1. Real-time Visibility Conflict*As you mentioned,
> > RocksDB 2PC would require delayed visibility until transaction commit.
> For
> > Fluss's positioning as a real-time streaming storage, this conflicts with
> > our fundamental requirement that writes should be immediately queryable.
> In
> > typical scenarios (e.g., real-time dashboards), users expect second-level
> > updates, not waiting for Flink checkpoint completion (which could be tens
> > of seconds).
> >
> > *2. Already Evaluated and Rejected*We actually evaluated transaction
> > mechanisms in the FIP design phase. From the "Rejected Alternatives"
> > section:
> > > Use Transaction Mechanism to Implement Exactly-Once
> > >
> > > Disadvantages: Extremely high implementation complexity, requires
> > refactoring Fluss's write path, high performance overhead (requires
> delayed
> > visibility, increased commit overhead), conflicts with Fluss's real-time
> > visibility design philosophy
> > >
> > > Rejection Reason: Cost too high, inconsistent with Fluss's real-time
> > streaming storage positioning
> > (See FIP Section: "Rejected Alternatives")
> >
> > *3. Additional Complexity with RocksDB 2PC*Beyond visibility issues:
> >
> >    - Distributed coordination: Requires a global transaction coordinator
> >    across multiple TabletServers
> >    - Flink checkpoint alignment: How to coordinate RocksDB commit with
> >    asynchronous Flink checkpoints?
> >    - Multi-job concurrency: Column-level partial updates would require
> >    complex transaction isolation coordination
> >    - Performance overhead: Prepare/commit overhead exists for every
> write,
> >    even in normal cases
> >
> >
> > *4. Why Undo Recovery Fits Better*Our approach optimizes for the common
> > case:
> >
> >    - Normal writes: Zero transaction overhead, immediate visibility
> >    - Failover (rare): Pay the cost of undo operations only when needed
> >    - Lightweight: Leverages existing Changelog capability, no global
> >    coordinator needed
> >    - Localized: Each bucket handles recovery independently via offset
> >    comparison
> >
> > *Summary*
> > While RocksDB 2PC is theoretically cleaner from a database perspective,
> it
> > introduces unacceptable trade-offs for Fluss's real-time streaming use
> > cases. The Undo Recovery approach better aligns with our "optimize for
> the
> > common path" philosophy and maintains Fluss's real-time characteristics.
> > Would love to discuss further if you have additional thoughts!
> >
> > Best regards,
> > Yang
>

Reply via email to