Hi Jark,

Thanks for the thoughtful feedback.  The API changes are reasonable to me, and 
the FIP has already been updated.

Best,
Yunhong

On 2025/12/21 14:37:45 Jark Wu wrote:
> Thanks for the great proposal.
> 
> I think the current terminology may cause confusion about the
> relationship between "snapshot consumer" and Kafka-style consumer
> groups, similar to the consumer framework question raised by Liebing.
> 
> Paimon uses the term "consumer ID" because its semantics are indeed
> analogous to Kafka consumer groups. However, Fluss’s snapshot
> "consumer" is fundamentally different: it’s solely used to pin KV
> snapshot files and does not track consumption offsets at all.
> 
> Since this mechanism is designed specifically to prevent deletion and
> is typically time-bound (i.e., the snapshot shouldn’t be retained
> indefinitely if the client crashes), we should avoid the term
> "Consumer", as it usually implies long-lived, streaming consumption,
> and to avoid conflicts with the future "consumer group" in Fluss.
> 
> Therefore, I suggest naming it "KV Snapshot Lease".
> 
> The client "leases" the snapshot for a defined period. Once the lease
> expires and isn’t renewed, the server may safely reclaim the snapshot.
> The term "lease" is widely used in distributed systems (e.g., HDFS,
> etcd) for exactly this purpose.
> 
> We can then update the interfaces accordingly:
> 
> ```
> CompletableFuture<AcquireKvSnapshotLeaseResult> acquireKvSnapshotLease(
> String leaseId,
> Map<TableBucket, Long> snapshotIds,
> long leaseDuration);
> 
> CompletableFuture<Void> renewKvSnapshotLease(
> String leaseId,
> long leaseDuration);
> 
> CompletableFuture<Void> releaseKvSnapshotLease(
> String leaseId,
> Set<TableBucket> bucketsToRelease);
> 
> CompletableFuture<Void> dropKvSnapshotLease(String leaseId);
> ```
> 
> What do you think?
> 
> Other minor suggestions:
> 
> (1) I think we may need to use a compacted binary format for the
> zknode instead of using JSON. I'm afraid it will easily be greater
> than 1MB if using JSON.
> 
> (2) We should at least retain 2 snapshots by default and for best
> practice. This doesn't increase lots of remote storage size (only one
> incremental size) and double zk node overhead. However, this can
> greatly simplify the implementaion without dealing the
> `failedTableBucketSet`. We can still introduce the
> `failedTableBucketSet`, but use it for optimization in the future to
> reduce the numRetain to one.
> 
> Best,
> Jark
> 
> On Sun, 21 Dec 2025 at 11:27, Yunhong Zheng <[email protected]> wrote:
> >
> > Hi Liebing,
> >
> > Thanks for the thoughtful feedback. We indeed need to consider the problem 
> > of partition expiration. In the short term, I personally think we could 
> > choose to ignore partition expiration—preventing expired partitions from 
> > triggering failover in read jobs or tiering jobs. There's no need to 
> > introduce partition consumer logic, which would increase implementation 
> > complexity. What do you think?
> >
> > Best regards,
> > Yunhong
> >
> > On 2025/12/18 07:01:10 Liebing Yu wrote:
> > > Hi Yunhong!
> > >
> > > Thanks for your great work. I have a few questions:
> > >
> > > 1. Regarding Partition Expiration and Data Consistency:
> > > For KV tables, snapshot expiration significantly impacts consumers.
> > > However, for both KV and Log tables, partition expiration is another
> > > critical factor. Imagine a Flink job reading a Fluss partition that 
> > > expires
> > > due to TTL; in this case, data already written to that partition will not
> > > be fully consumed.
> > >
> > > This creates a more pronounced data consistency issue in "union read"
> > > scenarios (integrated lake-stream tables). Consider this scenario:
> > > A Flink job starts consuming a partition where some data has already been
> > > synced to the lake by the tiering service. If the partition expires in
> > > Fluss during the consumption process (even if its data has been fully
> > > synced to the lake), the Flink job will end up with incomplete streaming
> > > data. However, a subsequent batch read of the same partition would 
> > > retrieve
> > > the full data from the lake. This leads to a stream-batch 
> > > inconsistency—the
> > > same partition yields different results depending on whether it's read via
> > > the stream or the batch path.
> > >
> > > 2. Regarding the Consumer Framework:
> > > Beyond the KV consumer, the issues mentioned above suggest that we may 
> > > need
> > > a Partition Consumer in the future. In fact, it might be beneficial to
> > > introduce a generic Consumer concept for any ordinary job, allowing Fluss
> > > to monitor the consumption progress of various clients.
> > >
> > > My question is: Should we take these factors into account now and 
> > > establish
> > > a more comprehensive consumer framework? This would make it much easier to
> > > integrate new consumer types in the future.
> > >
> > >
> > > Best regards,
> > > Liebing Yu
> > >
> > >
> > > On Tue, 16 Dec 2025 at 19:33, Yunhong Zheng <[email protected]> wrote:
> > >
> > > > Hi Yang,
> > > >
> > > > Thanks for the thoughtful feedback. Your analysis has significantly
> > > > improved the stability of introducing the this function. I will address
> > > > each problem individually:
> > > >
> > > > Re for question 1:
> > > >
> > > > Good catch!  I encountered this issue during testing as well. The key
> > > > problem is the time gap between fetching the latest kv snapshot and
> > > > registering the kv snapshot consumer, which are not atomic operations. 
> > > > This
> > > > was considered during the design phase, but to avoid altering the 
> > > > semantics
> > > > of Admin#getLatestKvSnapshots, we chose to keep the register kv snapshot
> > > > consumer as a separate API.
> > > >
> > > > The solution is to add a retry mechanism: the failedTableBucketSet in 
> > > > the
> > > > RegisterKvSnapshotResult returned by registerKvSnapshotConsumer must be
> > > > retried repeatedly (Get latest snapshot -> register kv snapshot 
> > > > consumer)
> > > > until all buckets are successfully registered.
> > > >
> > > > ------
> > > > Re for question 2:
> > > >
> > > > It's indeed worth considering continuing to retain multiple snapshots 
> > > > for
> > > > one table bucket to reduce the impact on stability. The configuration
> > > > 'kv.snapshot.num-retained' will also be kept as is for now. However, 
> > > > there
> > > > might still be a potential issue: excessive ZooKeeper metadata, which 
> > > > can
> > > > be addressed as a separate FIP or issue.
> > > >
> > > > ------
> > > > Re for question 3:
> > > >
> > > > For the following scenarios, we can consider them as rare cases or 
> > > > misuse:
> > > >
> > > > 1. A job continue failover and is restored after a day or longer.
> > > > 2. Consuming a snapshot takes an exceptionally long time.
> > > > 3. The same consumerId is used by multiple jobs to consume multiple 
> > > > tables.
> > > >
> > > > These scenarios may lead to the snapshot being cleared before the user
> > > > consumes it. For such cases, we can document them in the user guide, and
> > > > users can also configure a larger expiration time to mitigate this 
> > > > problem.
> > > >
> > > >
> > > > -----
> > > > Re for question 4:
> > > >
> > > > This question involves a trade-off between two strategies:
> > > >
> > > > 1. Expiration time based on the first registration: This accelerates the
> > > > consumer's TTL, preventing snapshots from being retained for extended
> > > > periods and avoiding metadata bloat.
> > > > 2. Expiration time based on the latest registration: This minimizes the
> > > > risk of snapshots being cleaned up during consumption.
> > > >
> > > > I personally lean toward the first strategy, but the second is also 
> > > > valid.
> > > >
> > > > Best regards,
> > > > Yunhong
> > > >
> > > > On 2025/12/16 08:19:54 Yang Wang wrote:
> > > > > Hi Yunhong,
> > > > >
> > > > > Thank you for driving this very useful initiative—it will 
> > > > > significantly
> > > > > improve the stability of data consumption and the overall user
> > > > experience.
> > > > > I generally agree with most aspects of the design, but I have a few 
> > > > > minor
> > > > > questions:
> > > > >
> > > > >    1.
> > > > >
> > > > >    When Flink enumerates the snapshots it needs to subscribe to and
> > > > >    attempts to subscribe, what is the expected behavior if a snapshot
> > > > gets
> > > > >    deleted just before the subscription request is sent? (This 
> > > > > scenario
> > > > might
> > > > >    occur in certain edge cases.)
> > > > >    2.
> > > > >
> > > > >    Retaining only one snapshot by default could be risky. Currently,
> > > > >    neither the CoordinatorServer nor the TabletServer can fully
> > > > guarantee that
> > > > >    snapshots registered in ZooKeeper are always reliable and valid
> > > > (although
> > > > >    this is the design intention, there might still be undiscovered 
> > > > > bugs).
> > > > >    3.
> > > > >
> > > > >    I didn’t see any API for updating subscriptions. If a Flink job
> > > > >    experiences a prolonged failover for some reason, would the unread
> > > > >    snapshots expire as a result?
> > > > >    4.
> > > > >
> > > > >    Related to the previous question: Is the TTL of a snapshot 
> > > > > calculated
> > > > >    from the time it was registered, or from the last time a consumer
> > > > updated
> > > > >    its subscription?
> > > > >
> > > > > Best regards,
> > > > > Yang
> > > > >
> > > > > yunhong Zheng <[email protected]> 于2025年12月11日周四 22:40写道:
> > > > >
> > > > > > Hi all,
> > > > > >
> > > > > > Currently, for Fluss PrimaryKey Table, the number of kv snapshots
> > > > retained
> > > > > > per bucket is controlled by the server
> > > > > > option `kv.snapshot.num-retained`  (default value: 1). If this value
> > > > is set
> > > > > > too small, Kv snapshots that are being actively consumed may be 
> > > > > > deleted
> > > > > > while a consumer is still consuming them.
> > > > > >
> > > > > > This case will cause a Flink job which read PrimaryKey table fail 
> > > > > > and
> > > > > > cannot be restarted from its previous state.
> > > > > >
> > > > > > To avoid this case, the fluss server needs to be aware of which
> > > > consumers
> > > > > > are actively consuming the corresponding kv snapshots, and can not
> > > > delete
> > > > > > these kv snapshots that are currently being consumed.
> > > > > >
> > > > > > So, I'd like to propose FIP-22: Support Kv Snapshot Consumer[1].
> > > > > >
> > > > > > Any feedback are suggestions on this proposal are welcome!
> > > > > >
> > > > > > [1]:
> > > > > >
> > > > > >
> > > > https://cwiki.apache.org/confluence/display/FLUSS/FIP-22+Support+Kv+Snapshot+Consumer
> > > > > >
> > > > > > Regards,
> > > > > > Yunhong
> > > > > >
> > > > >
> > > >
> > >
> 

Reply via email to