Hi Jark, Thanks for the thoughtful feedback. The API changes are reasonable to me, and the FIP has already been updated.
Best, Yunhong On 2025/12/21 14:37:45 Jark Wu wrote: > Thanks for the great proposal. > > I think the current terminology may cause confusion about the > relationship between "snapshot consumer" and Kafka-style consumer > groups, similar to the consumer framework question raised by Liebing. > > Paimon uses the term "consumer ID" because its semantics are indeed > analogous to Kafka consumer groups. However, Fluss’s snapshot > "consumer" is fundamentally different: it’s solely used to pin KV > snapshot files and does not track consumption offsets at all. > > Since this mechanism is designed specifically to prevent deletion and > is typically time-bound (i.e., the snapshot shouldn’t be retained > indefinitely if the client crashes), we should avoid the term > "Consumer", as it usually implies long-lived, streaming consumption, > and to avoid conflicts with the future "consumer group" in Fluss. > > Therefore, I suggest naming it "KV Snapshot Lease". > > The client "leases" the snapshot for a defined period. Once the lease > expires and isn’t renewed, the server may safely reclaim the snapshot. > The term "lease" is widely used in distributed systems (e.g., HDFS, > etcd) for exactly this purpose. > > We can then update the interfaces accordingly: > > ``` > CompletableFuture<AcquireKvSnapshotLeaseResult> acquireKvSnapshotLease( > String leaseId, > Map<TableBucket, Long> snapshotIds, > long leaseDuration); > > CompletableFuture<Void> renewKvSnapshotLease( > String leaseId, > long leaseDuration); > > CompletableFuture<Void> releaseKvSnapshotLease( > String leaseId, > Set<TableBucket> bucketsToRelease); > > CompletableFuture<Void> dropKvSnapshotLease(String leaseId); > ``` > > What do you think? > > Other minor suggestions: > > (1) I think we may need to use a compacted binary format for the > zknode instead of using JSON. I'm afraid it will easily be greater > than 1MB if using JSON. > > (2) We should at least retain 2 snapshots by default and for best > practice. This doesn't increase lots of remote storage size (only one > incremental size) and double zk node overhead. However, this can > greatly simplify the implementaion without dealing the > `failedTableBucketSet`. We can still introduce the > `failedTableBucketSet`, but use it for optimization in the future to > reduce the numRetain to one. > > Best, > Jark > > On Sun, 21 Dec 2025 at 11:27, Yunhong Zheng <[email protected]> wrote: > > > > Hi Liebing, > > > > Thanks for the thoughtful feedback. We indeed need to consider the problem > > of partition expiration. In the short term, I personally think we could > > choose to ignore partition expiration—preventing expired partitions from > > triggering failover in read jobs or tiering jobs. There's no need to > > introduce partition consumer logic, which would increase implementation > > complexity. What do you think? > > > > Best regards, > > Yunhong > > > > On 2025/12/18 07:01:10 Liebing Yu wrote: > > > Hi Yunhong! > > > > > > Thanks for your great work. I have a few questions: > > > > > > 1. Regarding Partition Expiration and Data Consistency: > > > For KV tables, snapshot expiration significantly impacts consumers. > > > However, for both KV and Log tables, partition expiration is another > > > critical factor. Imagine a Flink job reading a Fluss partition that > > > expires > > > due to TTL; in this case, data already written to that partition will not > > > be fully consumed. > > > > > > This creates a more pronounced data consistency issue in "union read" > > > scenarios (integrated lake-stream tables). Consider this scenario: > > > A Flink job starts consuming a partition where some data has already been > > > synced to the lake by the tiering service. If the partition expires in > > > Fluss during the consumption process (even if its data has been fully > > > synced to the lake), the Flink job will end up with incomplete streaming > > > data. However, a subsequent batch read of the same partition would > > > retrieve > > > the full data from the lake. This leads to a stream-batch > > > inconsistency—the > > > same partition yields different results depending on whether it's read via > > > the stream or the batch path. > > > > > > 2. Regarding the Consumer Framework: > > > Beyond the KV consumer, the issues mentioned above suggest that we may > > > need > > > a Partition Consumer in the future. In fact, it might be beneficial to > > > introduce a generic Consumer concept for any ordinary job, allowing Fluss > > > to monitor the consumption progress of various clients. > > > > > > My question is: Should we take these factors into account now and > > > establish > > > a more comprehensive consumer framework? This would make it much easier to > > > integrate new consumer types in the future. > > > > > > > > > Best regards, > > > Liebing Yu > > > > > > > > > On Tue, 16 Dec 2025 at 19:33, Yunhong Zheng <[email protected]> wrote: > > > > > > > Hi Yang, > > > > > > > > Thanks for the thoughtful feedback. Your analysis has significantly > > > > improved the stability of introducing the this function. I will address > > > > each problem individually: > > > > > > > > Re for question 1: > > > > > > > > Good catch! I encountered this issue during testing as well. The key > > > > problem is the time gap between fetching the latest kv snapshot and > > > > registering the kv snapshot consumer, which are not atomic operations. > > > > This > > > > was considered during the design phase, but to avoid altering the > > > > semantics > > > > of Admin#getLatestKvSnapshots, we chose to keep the register kv snapshot > > > > consumer as a separate API. > > > > > > > > The solution is to add a retry mechanism: the failedTableBucketSet in > > > > the > > > > RegisterKvSnapshotResult returned by registerKvSnapshotConsumer must be > > > > retried repeatedly (Get latest snapshot -> register kv snapshot > > > > consumer) > > > > until all buckets are successfully registered. > > > > > > > > ------ > > > > Re for question 2: > > > > > > > > It's indeed worth considering continuing to retain multiple snapshots > > > > for > > > > one table bucket to reduce the impact on stability. The configuration > > > > 'kv.snapshot.num-retained' will also be kept as is for now. However, > > > > there > > > > might still be a potential issue: excessive ZooKeeper metadata, which > > > > can > > > > be addressed as a separate FIP or issue. > > > > > > > > ------ > > > > Re for question 3: > > > > > > > > For the following scenarios, we can consider them as rare cases or > > > > misuse: > > > > > > > > 1. A job continue failover and is restored after a day or longer. > > > > 2. Consuming a snapshot takes an exceptionally long time. > > > > 3. The same consumerId is used by multiple jobs to consume multiple > > > > tables. > > > > > > > > These scenarios may lead to the snapshot being cleared before the user > > > > consumes it. For such cases, we can document them in the user guide, and > > > > users can also configure a larger expiration time to mitigate this > > > > problem. > > > > > > > > > > > > ----- > > > > Re for question 4: > > > > > > > > This question involves a trade-off between two strategies: > > > > > > > > 1. Expiration time based on the first registration: This accelerates the > > > > consumer's TTL, preventing snapshots from being retained for extended > > > > periods and avoiding metadata bloat. > > > > 2. Expiration time based on the latest registration: This minimizes the > > > > risk of snapshots being cleaned up during consumption. > > > > > > > > I personally lean toward the first strategy, but the second is also > > > > valid. > > > > > > > > Best regards, > > > > Yunhong > > > > > > > > On 2025/12/16 08:19:54 Yang Wang wrote: > > > > > Hi Yunhong, > > > > > > > > > > Thank you for driving this very useful initiative—it will > > > > > significantly > > > > > improve the stability of data consumption and the overall user > > > > experience. > > > > > I generally agree with most aspects of the design, but I have a few > > > > > minor > > > > > questions: > > > > > > > > > > 1. > > > > > > > > > > When Flink enumerates the snapshots it needs to subscribe to and > > > > > attempts to subscribe, what is the expected behavior if a snapshot > > > > gets > > > > > deleted just before the subscription request is sent? (This > > > > > scenario > > > > might > > > > > occur in certain edge cases.) > > > > > 2. > > > > > > > > > > Retaining only one snapshot by default could be risky. Currently, > > > > > neither the CoordinatorServer nor the TabletServer can fully > > > > guarantee that > > > > > snapshots registered in ZooKeeper are always reliable and valid > > > > (although > > > > > this is the design intention, there might still be undiscovered > > > > > bugs). > > > > > 3. > > > > > > > > > > I didn’t see any API for updating subscriptions. If a Flink job > > > > > experiences a prolonged failover for some reason, would the unread > > > > > snapshots expire as a result? > > > > > 4. > > > > > > > > > > Related to the previous question: Is the TTL of a snapshot > > > > > calculated > > > > > from the time it was registered, or from the last time a consumer > > > > updated > > > > > its subscription? > > > > > > > > > > Best regards, > > > > > Yang > > > > > > > > > > yunhong Zheng <[email protected]> 于2025年12月11日周四 22:40写道: > > > > > > > > > > > Hi all, > > > > > > > > > > > > Currently, for Fluss PrimaryKey Table, the number of kv snapshots > > > > retained > > > > > > per bucket is controlled by the server > > > > > > option `kv.snapshot.num-retained` (default value: 1). If this value > > > > is set > > > > > > too small, Kv snapshots that are being actively consumed may be > > > > > > deleted > > > > > > while a consumer is still consuming them. > > > > > > > > > > > > This case will cause a Flink job which read PrimaryKey table fail > > > > > > and > > > > > > cannot be restarted from its previous state. > > > > > > > > > > > > To avoid this case, the fluss server needs to be aware of which > > > > consumers > > > > > > are actively consuming the corresponding kv snapshots, and can not > > > > delete > > > > > > these kv snapshots that are currently being consumed. > > > > > > > > > > > > So, I'd like to propose FIP-22: Support Kv Snapshot Consumer[1]. > > > > > > > > > > > > Any feedback are suggestions on this proposal are welcome! > > > > > > > > > > > > [1]: > > > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/display/FLUSS/FIP-22+Support+Kv+Snapshot+Consumer > > > > > > > > > > > > Regards, > > > > > > Yunhong > > > > > > > > > > > > > > > > > > >
