Hi Hongshun, Thanks for the thoughtful feedback. I will address each problem individually.
Re for question 1: It seems the solution is to either store `lease.id` in the state or set its default value to "no default value". For this version, I prefer set as no default value, and we can gradually relax this restriction in the future. WDYT? ------ Re for question 2: For newly discovered partitions, It will call `acquireKvSnapshotLeaseRequest` again to acquire the lease, which does not conflict with release. The `acquireKvSnapshotLeaseRequest` is handled as an coordinator event by the coordinator, and events are processed in the order their requests were sent, so theoretically there should be no problems. ------ Re for question3: Reducing the pressure on ZooKeeper primarily refers to limiting the storage size of individual Znode. For example, if a single Znode exceeds 2MB, it can cause abnormal connection issues between the ZooKeeper client and server. Currently, the number of Znodes is not a bottleneck, and this FIP also introduces the operational capability of `dropKvSnapshotLease`. Best, Yunhong On 2025/12/25 03:58:29 Hongshun Wang wrote: > Hi YunHong, > > Thanks for the great proposal. I have several questions: > 1. It seems that scan.kv.snapshot.lease-id is controlled by configuration, > and the default value is UUID.randomUUID(). If the user doesn't set this > value, the UUID.randomUUID() will be changed each time the job's restarted, > then lease-id cannot be deleted until lease-duration(default value is 1 > days). > > 2. What happens for newly discovered buckets(for example, newly created > partition tables ), a new least-id will be created or just reuse the > least-id? If the newly discovered split1 is releasing the least-id, and > the newly discovered split1 is renewing for this least-id. How to control > the request order? > 3. This FIP wants to decrease the pressure of zookeeper. But it seems that > a lot of *lease nodes will be created if there are many flink jobs?* > > *Best,* > *Hongshun* > > On Wed, Dec 24, 2025 at 4:25 PM Yunhong Zheng <[email protected]> wrote: > > > Hi Jark, > > > > Thanks for the thoughtful feedback. The API changes are reasonable to me, > > and the FIP has already been updated. > > > > Best, > > Yunhong > > > > On 2025/12/21 14:37:45 Jark Wu wrote: > > > Thanks for the great proposal. > > > > > > I think the current terminology may cause confusion about the > > > relationship between "snapshot consumer" and Kafka-style consumer > > > groups, similar to the consumer framework question raised by Liebing. > > > > > > Paimon uses the term "consumer ID" because its semantics are indeed > > > analogous to Kafka consumer groups. However, Fluss’s snapshot > > > "consumer" is fundamentally different: it’s solely used to pin KV > > > snapshot files and does not track consumption offsets at all. > > > > > > Since this mechanism is designed specifically to prevent deletion and > > > is typically time-bound (i.e., the snapshot shouldn’t be retained > > > indefinitely if the client crashes), we should avoid the term > > > "Consumer", as it usually implies long-lived, streaming consumption, > > > and to avoid conflicts with the future "consumer group" in Fluss. > > > > > > Therefore, I suggest naming it "KV Snapshot Lease". > > > > > > The client "leases" the snapshot for a defined period. Once the lease > > > expires and isn’t renewed, the server may safely reclaim the snapshot. > > > The term "lease" is widely used in distributed systems (e.g., HDFS, > > > etcd) for exactly this purpose. > > > > > > We can then update the interfaces accordingly: > > > > > > ``` > > > CompletableFuture<AcquireKvSnapshotLeaseResult> acquireKvSnapshotLease( > > > String leaseId, > > > Map<TableBucket, Long> snapshotIds, > > > long leaseDuration); > > > > > > CompletableFuture<Void> renewKvSnapshotLease( > > > String leaseId, > > > long leaseDuration); > > > > > > CompletableFuture<Void> releaseKvSnapshotLease( > > > String leaseId, > > > Set<TableBucket> bucketsToRelease); > > > > > > CompletableFuture<Void> dropKvSnapshotLease(String leaseId); > > > ``` > > > > > > What do you think? > > > > > > Other minor suggestions: > > > > > > (1) I think we may need to use a compacted binary format for the > > > zknode instead of using JSON. I'm afraid it will easily be greater > > > than 1MB if using JSON. > > > > > > (2) We should at least retain 2 snapshots by default and for best > > > practice. This doesn't increase lots of remote storage size (only one > > > incremental size) and double zk node overhead. However, this can > > > greatly simplify the implementaion without dealing the > > > `failedTableBucketSet`. We can still introduce the > > > `failedTableBucketSet`, but use it for optimization in the future to > > > reduce the numRetain to one. > > > > > > Best, > > > Jark > > > > > > On Sun, 21 Dec 2025 at 11:27, Yunhong Zheng <[email protected]> wrote: > > > > > > > > Hi Liebing, > > > > > > > > Thanks for the thoughtful feedback. We indeed need to consider the > > problem of partition expiration. In the short term, I personally think we > > could choose to ignore partition expiration—preventing expired partitions > > from triggering failover in read jobs or tiering jobs. There's no need to > > introduce partition consumer logic, which would increase implementation > > complexity. What do you think? > > > > > > > > Best regards, > > > > Yunhong > > > > > > > > On 2025/12/18 07:01:10 Liebing Yu wrote: > > > > > Hi Yunhong! > > > > > > > > > > Thanks for your great work. I have a few questions: > > > > > > > > > > 1. Regarding Partition Expiration and Data Consistency: > > > > > For KV tables, snapshot expiration significantly impacts consumers. > > > > > However, for both KV and Log tables, partition expiration is another > > > > > critical factor. Imagine a Flink job reading a Fluss partition that > > expires > > > > > due to TTL; in this case, data already written to that partition > > will not > > > > > be fully consumed. > > > > > > > > > > This creates a more pronounced data consistency issue in "union read" > > > > > scenarios (integrated lake-stream tables). Consider this scenario: > > > > > A Flink job starts consuming a partition where some data has already > > been > > > > > synced to the lake by the tiering service. If the partition expires > > in > > > > > Fluss during the consumption process (even if its data has been fully > > > > > synced to the lake), the Flink job will end up with incomplete > > streaming > > > > > data. However, a subsequent batch read of the same partition would > > retrieve > > > > > the full data from the lake. This leads to a stream-batch > > inconsistency—the > > > > > same partition yields different results depending on whether it's > > read via > > > > > the stream or the batch path. > > > > > > > > > > 2. Regarding the Consumer Framework: > > > > > Beyond the KV consumer, the issues mentioned above suggest that we > > may need > > > > > a Partition Consumer in the future. In fact, it might be beneficial > > to > > > > > introduce a generic Consumer concept for any ordinary job, allowing > > Fluss > > > > > to monitor the consumption progress of various clients. > > > > > > > > > > My question is: Should we take these factors into account now and > > establish > > > > > a more comprehensive consumer framework? This would make it much > > easier to > > > > > integrate new consumer types in the future. > > > > > > > > > > > > > > > Best regards, > > > > > Liebing Yu > > > > > > > > > > > > > > > On Tue, 16 Dec 2025 at 19:33, Yunhong Zheng <[email protected]> > > wrote: > > > > > > > > > > > Hi Yang, > > > > > > > > > > > > Thanks for the thoughtful feedback. Your analysis has significantly > > > > > > improved the stability of introducing the this function. I will > > address > > > > > > each problem individually: > > > > > > > > > > > > Re for question 1: > > > > > > > > > > > > Good catch! I encountered this issue during testing as well. The > > key > > > > > > problem is the time gap between fetching the latest kv snapshot and > > > > > > registering the kv snapshot consumer, which are not atomic > > operations. This > > > > > > was considered during the design phase, but to avoid altering the > > semantics > > > > > > of Admin#getLatestKvSnapshots, we chose to keep the register kv > > snapshot > > > > > > consumer as a separate API. > > > > > > > > > > > > The solution is to add a retry mechanism: the failedTableBucketSet > > in the > > > > > > RegisterKvSnapshotResult returned by registerKvSnapshotConsumer > > must be > > > > > > retried repeatedly (Get latest snapshot -> register kv snapshot > > consumer) > > > > > > until all buckets are successfully registered. > > > > > > > > > > > > ------ > > > > > > Re for question 2: > > > > > > > > > > > > It's indeed worth considering continuing to retain multiple > > snapshots for > > > > > > one table bucket to reduce the impact on stability. The > > configuration > > > > > > 'kv.snapshot.num-retained' will also be kept as is for now. > > However, there > > > > > > might still be a potential issue: excessive ZooKeeper metadata, > > which can > > > > > > be addressed as a separate FIP or issue. > > > > > > > > > > > > ------ > > > > > > Re for question 3: > > > > > > > > > > > > For the following scenarios, we can consider them as rare cases or > > misuse: > > > > > > > > > > > > 1. A job continue failover and is restored after a day or longer. > > > > > > 2. Consuming a snapshot takes an exceptionally long time. > > > > > > 3. The same consumerId is used by multiple jobs to consume > > multiple tables. > > > > > > > > > > > > These scenarios may lead to the snapshot being cleared before the > > user > > > > > > consumes it. For such cases, we can document them in the user > > guide, and > > > > > > users can also configure a larger expiration time to mitigate this > > problem. > > > > > > > > > > > > > > > > > > ----- > > > > > > Re for question 4: > > > > > > > > > > > > This question involves a trade-off between two strategies: > > > > > > > > > > > > 1. Expiration time based on the first registration: This > > accelerates the > > > > > > consumer's TTL, preventing snapshots from being retained for > > extended > > > > > > periods and avoiding metadata bloat. > > > > > > 2. Expiration time based on the latest registration: This > > minimizes the > > > > > > risk of snapshots being cleaned up during consumption. > > > > > > > > > > > > I personally lean toward the first strategy, but the second is > > also valid. > > > > > > > > > > > > Best regards, > > > > > > Yunhong > > > > > > > > > > > > On 2025/12/16 08:19:54 Yang Wang wrote: > > > > > > > Hi Yunhong, > > > > > > > > > > > > > > Thank you for driving this very useful initiative—it will > > significantly > > > > > > > improve the stability of data consumption and the overall user > > > > > > experience. > > > > > > > I generally agree with most aspects of the design, but I have a > > few minor > > > > > > > questions: > > > > > > > > > > > > > > 1. > > > > > > > > > > > > > > When Flink enumerates the snapshots it needs to subscribe to > > and > > > > > > > attempts to subscribe, what is the expected behavior if a > > snapshot > > > > > > gets > > > > > > > deleted just before the subscription request is sent? (This > > scenario > > > > > > might > > > > > > > occur in certain edge cases.) > > > > > > > 2. > > > > > > > > > > > > > > Retaining only one snapshot by default could be risky. > > Currently, > > > > > > > neither the CoordinatorServer nor the TabletServer can fully > > > > > > guarantee that > > > > > > > snapshots registered in ZooKeeper are always reliable and > > valid > > > > > > (although > > > > > > > this is the design intention, there might still be > > undiscovered bugs). > > > > > > > 3. > > > > > > > > > > > > > > I didn’t see any API for updating subscriptions. If a Flink > > job > > > > > > > experiences a prolonged failover for some reason, would the > > unread > > > > > > > snapshots expire as a result? > > > > > > > 4. > > > > > > > > > > > > > > Related to the previous question: Is the TTL of a snapshot > > calculated > > > > > > > from the time it was registered, or from the last time a > > consumer > > > > > > updated > > > > > > > its subscription? > > > > > > > > > > > > > > Best regards, > > > > > > > Yang > > > > > > > > > > > > > > yunhong Zheng <[email protected]> 于2025年12月11日周四 22:40写道: > > > > > > > > > > > > > > > Hi all, > > > > > > > > > > > > > > > > Currently, for Fluss PrimaryKey Table, the number of kv > > snapshots > > > > > > retained > > > > > > > > per bucket is controlled by the server > > > > > > > > option `kv.snapshot.num-retained` (default value: 1). If this > > value > > > > > > is set > > > > > > > > too small, Kv snapshots that are being actively consumed may > > be deleted > > > > > > > > while a consumer is still consuming them. > > > > > > > > > > > > > > > > This case will cause a Flink job which read PrimaryKey table > > fail and > > > > > > > > cannot be restarted from its previous state. > > > > > > > > > > > > > > > > To avoid this case, the fluss server needs to be aware of which > > > > > > consumers > > > > > > > > are actively consuming the corresponding kv snapshots, and can > > not > > > > > > delete > > > > > > > > these kv snapshots that are currently being consumed. > > > > > > > > > > > > > > > > So, I'd like to propose FIP-22: Support Kv Snapshot > > Consumer[1]. > > > > > > > > > > > > > > > > Any feedback are suggestions on this proposal are welcome! > > > > > > > > > > > > > > > > [1]: > > > > > > > > > > > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/display/FLUSS/FIP-22+Support+Kv+Snapshot+Consumer > > > > > > > > > > > > > > > > Regards, > > > > > > > > Yunhong > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > >
