Hi YunHong,

Thanks for the great proposal. I have several questions:
1. It seems that scan.kv.snapshot.lease-id is controlled by configuration,
and the default value is UUID.randomUUID(). If the user doesn't set this
value, the UUID.randomUUID() will be changed each time the job's restarted,
then  lease-id cannot be deleted until lease-duration(default value is 1
days).

2. What happens for newly discovered buckets(for example, newly created
partition tables ),  a new least-id will be created or just reuse the
least-id? If the newly discovered split1 is releasing the  least-id, and
the newly discovered split1 is renewing for this  least-id. How to control
the request order?
3. This FIP wants to decrease the pressure of zookeeper.  But it seems that
a lot of *lease nodes will be created if there are many flink jobs?*

*Best,*
*Hongshun*

On Wed, Dec 24, 2025 at 4:25 PM Yunhong Zheng <[email protected]> wrote:

> Hi Jark,
>
> Thanks for the thoughtful feedback.  The API changes are reasonable to me,
> and the FIP has already been updated.
>
> Best,
> Yunhong
>
> On 2025/12/21 14:37:45 Jark Wu wrote:
> > Thanks for the great proposal.
> >
> > I think the current terminology may cause confusion about the
> > relationship between "snapshot consumer" and Kafka-style consumer
> > groups, similar to the consumer framework question raised by Liebing.
> >
> > Paimon uses the term "consumer ID" because its semantics are indeed
> > analogous to Kafka consumer groups. However, Fluss’s snapshot
> > "consumer" is fundamentally different: it’s solely used to pin KV
> > snapshot files and does not track consumption offsets at all.
> >
> > Since this mechanism is designed specifically to prevent deletion and
> > is typically time-bound (i.e., the snapshot shouldn’t be retained
> > indefinitely if the client crashes), we should avoid the term
> > "Consumer", as it usually implies long-lived, streaming consumption,
> > and to avoid conflicts with the future "consumer group" in Fluss.
> >
> > Therefore, I suggest naming it "KV Snapshot Lease".
> >
> > The client "leases" the snapshot for a defined period. Once the lease
> > expires and isn’t renewed, the server may safely reclaim the snapshot.
> > The term "lease" is widely used in distributed systems (e.g., HDFS,
> > etcd) for exactly this purpose.
> >
> > We can then update the interfaces accordingly:
> >
> > ```
> > CompletableFuture<AcquireKvSnapshotLeaseResult> acquireKvSnapshotLease(
> > String leaseId,
> > Map<TableBucket, Long> snapshotIds,
> > long leaseDuration);
> >
> > CompletableFuture<Void> renewKvSnapshotLease(
> > String leaseId,
> > long leaseDuration);
> >
> > CompletableFuture<Void> releaseKvSnapshotLease(
> > String leaseId,
> > Set<TableBucket> bucketsToRelease);
> >
> > CompletableFuture<Void> dropKvSnapshotLease(String leaseId);
> > ```
> >
> > What do you think?
> >
> > Other minor suggestions:
> >
> > (1) I think we may need to use a compacted binary format for the
> > zknode instead of using JSON. I'm afraid it will easily be greater
> > than 1MB if using JSON.
> >
> > (2) We should at least retain 2 snapshots by default and for best
> > practice. This doesn't increase lots of remote storage size (only one
> > incremental size) and double zk node overhead. However, this can
> > greatly simplify the implementaion without dealing the
> > `failedTableBucketSet`. We can still introduce the
> > `failedTableBucketSet`, but use it for optimization in the future to
> > reduce the numRetain to one.
> >
> > Best,
> > Jark
> >
> > On Sun, 21 Dec 2025 at 11:27, Yunhong Zheng <[email protected]> wrote:
> > >
> > > Hi Liebing,
> > >
> > > Thanks for the thoughtful feedback. We indeed need to consider the
> problem of partition expiration. In the short term, I personally think we
> could choose to ignore partition expiration—preventing expired partitions
> from triggering failover in read jobs or tiering jobs. There's no need to
> introduce partition consumer logic, which would increase implementation
> complexity. What do you think?
> > >
> > > Best regards,
> > > Yunhong
> > >
> > > On 2025/12/18 07:01:10 Liebing Yu wrote:
> > > > Hi Yunhong!
> > > >
> > > > Thanks for your great work. I have a few questions:
> > > >
> > > > 1. Regarding Partition Expiration and Data Consistency:
> > > > For KV tables, snapshot expiration significantly impacts consumers.
> > > > However, for both KV and Log tables, partition expiration is another
> > > > critical factor. Imagine a Flink job reading a Fluss partition that
> expires
> > > > due to TTL; in this case, data already written to that partition
> will not
> > > > be fully consumed.
> > > >
> > > > This creates a more pronounced data consistency issue in "union read"
> > > > scenarios (integrated lake-stream tables). Consider this scenario:
> > > > A Flink job starts consuming a partition where some data has already
> been
> > > > synced to the lake by the tiering service. If the partition expires
> in
> > > > Fluss during the consumption process (even if its data has been fully
> > > > synced to the lake), the Flink job will end up with incomplete
> streaming
> > > > data. However, a subsequent batch read of the same partition would
> retrieve
> > > > the full data from the lake. This leads to a stream-batch
> inconsistency—the
> > > > same partition yields different results depending on whether it's
> read via
> > > > the stream or the batch path.
> > > >
> > > > 2. Regarding the Consumer Framework:
> > > > Beyond the KV consumer, the issues mentioned above suggest that we
> may need
> > > > a Partition Consumer in the future. In fact, it might be beneficial
> to
> > > > introduce a generic Consumer concept for any ordinary job, allowing
> Fluss
> > > > to monitor the consumption progress of various clients.
> > > >
> > > > My question is: Should we take these factors into account now and
> establish
> > > > a more comprehensive consumer framework? This would make it much
> easier to
> > > > integrate new consumer types in the future.
> > > >
> > > >
> > > > Best regards,
> > > > Liebing Yu
> > > >
> > > >
> > > > On Tue, 16 Dec 2025 at 19:33, Yunhong Zheng <[email protected]>
> wrote:
> > > >
> > > > > Hi Yang,
> > > > >
> > > > > Thanks for the thoughtful feedback. Your analysis has significantly
> > > > > improved the stability of introducing the this function. I will
> address
> > > > > each problem individually:
> > > > >
> > > > > Re for question 1:
> > > > >
> > > > > Good catch!  I encountered this issue during testing as well. The
> key
> > > > > problem is the time gap between fetching the latest kv snapshot and
> > > > > registering the kv snapshot consumer, which are not atomic
> operations. This
> > > > > was considered during the design phase, but to avoid altering the
> semantics
> > > > > of Admin#getLatestKvSnapshots, we chose to keep the register kv
> snapshot
> > > > > consumer as a separate API.
> > > > >
> > > > > The solution is to add a retry mechanism: the failedTableBucketSet
> in the
> > > > > RegisterKvSnapshotResult returned by registerKvSnapshotConsumer
> must be
> > > > > retried repeatedly (Get latest snapshot -> register kv snapshot
> consumer)
> > > > > until all buckets are successfully registered.
> > > > >
> > > > > ------
> > > > > Re for question 2:
> > > > >
> > > > > It's indeed worth considering continuing to retain multiple
> snapshots for
> > > > > one table bucket to reduce the impact on stability. The
> configuration
> > > > > 'kv.snapshot.num-retained' will also be kept as is for now.
> However, there
> > > > > might still be a potential issue: excessive ZooKeeper metadata,
> which can
> > > > > be addressed as a separate FIP or issue.
> > > > >
> > > > > ------
> > > > > Re for question 3:
> > > > >
> > > > > For the following scenarios, we can consider them as rare cases or
> misuse:
> > > > >
> > > > > 1. A job continue failover and is restored after a day or longer.
> > > > > 2. Consuming a snapshot takes an exceptionally long time.
> > > > > 3. The same consumerId is used by multiple jobs to consume
> multiple tables.
> > > > >
> > > > > These scenarios may lead to the snapshot being cleared before the
> user
> > > > > consumes it. For such cases, we can document them in the user
> guide, and
> > > > > users can also configure a larger expiration time to mitigate this
> problem.
> > > > >
> > > > >
> > > > > -----
> > > > > Re for question 4:
> > > > >
> > > > > This question involves a trade-off between two strategies:
> > > > >
> > > > > 1. Expiration time based on the first registration: This
> accelerates the
> > > > > consumer's TTL, preventing snapshots from being retained for
> extended
> > > > > periods and avoiding metadata bloat.
> > > > > 2. Expiration time based on the latest registration: This
> minimizes the
> > > > > risk of snapshots being cleaned up during consumption.
> > > > >
> > > > > I personally lean toward the first strategy, but the second is
> also valid.
> > > > >
> > > > > Best regards,
> > > > > Yunhong
> > > > >
> > > > > On 2025/12/16 08:19:54 Yang Wang wrote:
> > > > > > Hi Yunhong,
> > > > > >
> > > > > > Thank you for driving this very useful initiative—it will
> significantly
> > > > > > improve the stability of data consumption and the overall user
> > > > > experience.
> > > > > > I generally agree with most aspects of the design, but I have a
> few minor
> > > > > > questions:
> > > > > >
> > > > > >    1.
> > > > > >
> > > > > >    When Flink enumerates the snapshots it needs to subscribe to
> and
> > > > > >    attempts to subscribe, what is the expected behavior if a
> snapshot
> > > > > gets
> > > > > >    deleted just before the subscription request is sent? (This
> scenario
> > > > > might
> > > > > >    occur in certain edge cases.)
> > > > > >    2.
> > > > > >
> > > > > >    Retaining only one snapshot by default could be risky.
> Currently,
> > > > > >    neither the CoordinatorServer nor the TabletServer can fully
> > > > > guarantee that
> > > > > >    snapshots registered in ZooKeeper are always reliable and
> valid
> > > > > (although
> > > > > >    this is the design intention, there might still be
> undiscovered bugs).
> > > > > >    3.
> > > > > >
> > > > > >    I didn’t see any API for updating subscriptions. If a Flink
> job
> > > > > >    experiences a prolonged failover for some reason, would the
> unread
> > > > > >    snapshots expire as a result?
> > > > > >    4.
> > > > > >
> > > > > >    Related to the previous question: Is the TTL of a snapshot
> calculated
> > > > > >    from the time it was registered, or from the last time a
> consumer
> > > > > updated
> > > > > >    its subscription?
> > > > > >
> > > > > > Best regards,
> > > > > > Yang
> > > > > >
> > > > > > yunhong Zheng <[email protected]> 于2025年12月11日周四 22:40写道:
> > > > > >
> > > > > > > Hi all,
> > > > > > >
> > > > > > > Currently, for Fluss PrimaryKey Table, the number of kv
> snapshots
> > > > > retained
> > > > > > > per bucket is controlled by the server
> > > > > > > option `kv.snapshot.num-retained`  (default value: 1). If this
> value
> > > > > is set
> > > > > > > too small, Kv snapshots that are being actively consumed may
> be deleted
> > > > > > > while a consumer is still consuming them.
> > > > > > >
> > > > > > > This case will cause a Flink job which read PrimaryKey table
> fail and
> > > > > > > cannot be restarted from its previous state.
> > > > > > >
> > > > > > > To avoid this case, the fluss server needs to be aware of which
> > > > > consumers
> > > > > > > are actively consuming the corresponding kv snapshots, and can
> not
> > > > > delete
> > > > > > > these kv snapshots that are currently being consumed.
> > > > > > >
> > > > > > > So, I'd like to propose FIP-22: Support Kv Snapshot
> Consumer[1].
> > > > > > >
> > > > > > > Any feedback are suggestions on this proposal are welcome!
> > > > > > >
> > > > > > > [1]:
> > > > > > >
> > > > > > >
> > > > >
> https://cwiki.apache.org/confluence/display/FLUSS/FIP-22+Support+Kv+Snapshot+Consumer
> > > > > > >
> > > > > > > Regards,
> > > > > > > Yunhong
> > > > > > >
> > > > > >
> > > > >
> > > >
> >
>

Reply via email to