Thanks for the great proposal. I think the current terminology may cause confusion about the relationship between "snapshot consumer" and Kafka-style consumer groups, similar to the consumer framework question raised by Liebing.
Paimon uses the term "consumer ID" because its semantics are indeed analogous to Kafka consumer groups. However, Fluss’s snapshot "consumer" is fundamentally different: it’s solely used to pin KV snapshot files and does not track consumption offsets at all. Since this mechanism is designed specifically to prevent deletion and is typically time-bound (i.e., the snapshot shouldn’t be retained indefinitely if the client crashes), we should avoid the term "Consumer", as it usually implies long-lived, streaming consumption, and to avoid conflicts with the future "consumer group" in Fluss. Therefore, I suggest naming it "KV Snapshot Lease". The client "leases" the snapshot for a defined period. Once the lease expires and isn’t renewed, the server may safely reclaim the snapshot. The term "lease" is widely used in distributed systems (e.g., HDFS, etcd) for exactly this purpose. We can then update the interfaces accordingly: ``` CompletableFuture<AcquireKvSnapshotLeaseResult> acquireKvSnapshotLease( String leaseId, Map<TableBucket, Long> snapshotIds, long leaseDuration); CompletableFuture<Void> renewKvSnapshotLease( String leaseId, long leaseDuration); CompletableFuture<Void> releaseKvSnapshotLease( String leaseId, Set<TableBucket> bucketsToRelease); CompletableFuture<Void> dropKvSnapshotLease(String leaseId); ``` What do you think? Other minor suggestions: (1) I think we may need to use a compacted binary format for the zknode instead of using JSON. I'm afraid it will easily be greater than 1MB if using JSON. (2) We should at least retain 2 snapshots by default and for best practice. This doesn't increase lots of remote storage size (only one incremental size) and double zk node overhead. However, this can greatly simplify the implementaion without dealing the `failedTableBucketSet`. We can still introduce the `failedTableBucketSet`, but use it for optimization in the future to reduce the numRetain to one. Best, Jark On Sun, 21 Dec 2025 at 11:27, Yunhong Zheng <[email protected]> wrote: > > Hi Liebing, > > Thanks for the thoughtful feedback. We indeed need to consider the problem of > partition expiration. In the short term, I personally think we could choose > to ignore partition expiration—preventing expired partitions from triggering > failover in read jobs or tiering jobs. There's no need to introduce partition > consumer logic, which would increase implementation complexity. What do you > think? > > Best regards, > Yunhong > > On 2025/12/18 07:01:10 Liebing Yu wrote: > > Hi Yunhong! > > > > Thanks for your great work. I have a few questions: > > > > 1. Regarding Partition Expiration and Data Consistency: > > For KV tables, snapshot expiration significantly impacts consumers. > > However, for both KV and Log tables, partition expiration is another > > critical factor. Imagine a Flink job reading a Fluss partition that expires > > due to TTL; in this case, data already written to that partition will not > > be fully consumed. > > > > This creates a more pronounced data consistency issue in "union read" > > scenarios (integrated lake-stream tables). Consider this scenario: > > A Flink job starts consuming a partition where some data has already been > > synced to the lake by the tiering service. If the partition expires in > > Fluss during the consumption process (even if its data has been fully > > synced to the lake), the Flink job will end up with incomplete streaming > > data. However, a subsequent batch read of the same partition would retrieve > > the full data from the lake. This leads to a stream-batch inconsistency—the > > same partition yields different results depending on whether it's read via > > the stream or the batch path. > > > > 2. Regarding the Consumer Framework: > > Beyond the KV consumer, the issues mentioned above suggest that we may need > > a Partition Consumer in the future. In fact, it might be beneficial to > > introduce a generic Consumer concept for any ordinary job, allowing Fluss > > to monitor the consumption progress of various clients. > > > > My question is: Should we take these factors into account now and establish > > a more comprehensive consumer framework? This would make it much easier to > > integrate new consumer types in the future. > > > > > > Best regards, > > Liebing Yu > > > > > > On Tue, 16 Dec 2025 at 19:33, Yunhong Zheng <[email protected]> wrote: > > > > > Hi Yang, > > > > > > Thanks for the thoughtful feedback. Your analysis has significantly > > > improved the stability of introducing the this function. I will address > > > each problem individually: > > > > > > Re for question 1: > > > > > > Good catch! I encountered this issue during testing as well. The key > > > problem is the time gap between fetching the latest kv snapshot and > > > registering the kv snapshot consumer, which are not atomic operations. > > > This > > > was considered during the design phase, but to avoid altering the > > > semantics > > > of Admin#getLatestKvSnapshots, we chose to keep the register kv snapshot > > > consumer as a separate API. > > > > > > The solution is to add a retry mechanism: the failedTableBucketSet in the > > > RegisterKvSnapshotResult returned by registerKvSnapshotConsumer must be > > > retried repeatedly (Get latest snapshot -> register kv snapshot consumer) > > > until all buckets are successfully registered. > > > > > > ------ > > > Re for question 2: > > > > > > It's indeed worth considering continuing to retain multiple snapshots for > > > one table bucket to reduce the impact on stability. The configuration > > > 'kv.snapshot.num-retained' will also be kept as is for now. However, there > > > might still be a potential issue: excessive ZooKeeper metadata, which can > > > be addressed as a separate FIP or issue. > > > > > > ------ > > > Re for question 3: > > > > > > For the following scenarios, we can consider them as rare cases or misuse: > > > > > > 1. A job continue failover and is restored after a day or longer. > > > 2. Consuming a snapshot takes an exceptionally long time. > > > 3. The same consumerId is used by multiple jobs to consume multiple > > > tables. > > > > > > These scenarios may lead to the snapshot being cleared before the user > > > consumes it. For such cases, we can document them in the user guide, and > > > users can also configure a larger expiration time to mitigate this > > > problem. > > > > > > > > > ----- > > > Re for question 4: > > > > > > This question involves a trade-off between two strategies: > > > > > > 1. Expiration time based on the first registration: This accelerates the > > > consumer's TTL, preventing snapshots from being retained for extended > > > periods and avoiding metadata bloat. > > > 2. Expiration time based on the latest registration: This minimizes the > > > risk of snapshots being cleaned up during consumption. > > > > > > I personally lean toward the first strategy, but the second is also valid. > > > > > > Best regards, > > > Yunhong > > > > > > On 2025/12/16 08:19:54 Yang Wang wrote: > > > > Hi Yunhong, > > > > > > > > Thank you for driving this very useful initiative—it will significantly > > > > improve the stability of data consumption and the overall user > > > experience. > > > > I generally agree with most aspects of the design, but I have a few > > > > minor > > > > questions: > > > > > > > > 1. > > > > > > > > When Flink enumerates the snapshots it needs to subscribe to and > > > > attempts to subscribe, what is the expected behavior if a snapshot > > > gets > > > > deleted just before the subscription request is sent? (This scenario > > > might > > > > occur in certain edge cases.) > > > > 2. > > > > > > > > Retaining only one snapshot by default could be risky. Currently, > > > > neither the CoordinatorServer nor the TabletServer can fully > > > guarantee that > > > > snapshots registered in ZooKeeper are always reliable and valid > > > (although > > > > this is the design intention, there might still be undiscovered > > > > bugs). > > > > 3. > > > > > > > > I didn’t see any API for updating subscriptions. If a Flink job > > > > experiences a prolonged failover for some reason, would the unread > > > > snapshots expire as a result? > > > > 4. > > > > > > > > Related to the previous question: Is the TTL of a snapshot calculated > > > > from the time it was registered, or from the last time a consumer > > > updated > > > > its subscription? > > > > > > > > Best regards, > > > > Yang > > > > > > > > yunhong Zheng <[email protected]> 于2025年12月11日周四 22:40写道: > > > > > > > > > Hi all, > > > > > > > > > > Currently, for Fluss PrimaryKey Table, the number of kv snapshots > > > retained > > > > > per bucket is controlled by the server > > > > > option `kv.snapshot.num-retained` (default value: 1). If this value > > > is set > > > > > too small, Kv snapshots that are being actively consumed may be > > > > > deleted > > > > > while a consumer is still consuming them. > > > > > > > > > > This case will cause a Flink job which read PrimaryKey table fail and > > > > > cannot be restarted from its previous state. > > > > > > > > > > To avoid this case, the fluss server needs to be aware of which > > > consumers > > > > > are actively consuming the corresponding kv snapshots, and can not > > > delete > > > > > these kv snapshots that are currently being consumed. > > > > > > > > > > So, I'd like to propose FIP-22: Support Kv Snapshot Consumer[1]. > > > > > > > > > > Any feedback are suggestions on this proposal are welcome! > > > > > > > > > > [1]: > > > > > > > > > > > > > https://cwiki.apache.org/confluence/display/FLUSS/FIP-22+Support+Kv+Snapshot+Consumer > > > > > > > > > > Regards, > > > > > Yunhong > > > > > > > > > > > > > >
