Hi Hongshun,

Thanks for the thoughtful feedback. I will address each problem individually.

Re for question 1: It seems the solution is to either store `lease.id` in the 
state or set its default value to "no default value". For this version, I 
prefer set as no default value, and we can gradually relax this restriction in 
the future. WDYT?

------
Re for question 2: For newly discovered partitions, It will call 
`acquireKvSnapshotLeaseRequest` again to acquire the lease, which does not 
conflict with release. The `acquireKvSnapshotLeaseRequest` is handled as an 
coordinator event by the coordinator, and events are processed in the order 
their requests were sent, so theoretically there should be no problems.

------
Re for question3: Reducing the pressure on ZooKeeper primarily refers to 
limiting the storage size of individual Znode. For example, if a single Znode 
exceeds 2MB, it can cause abnormal connection issues between the ZooKeeper 
client and server. Currently, the number of Znodes is not a bottleneck, and 
this FIP also introduces the operational capability of `dropKvSnapshotLease`.

Best,
Yunhong

On 2025/12/25 03:58:29 Hongshun Wang wrote:
> Hi YunHong,
> 
> Thanks for the great proposal. I have several questions:
> 1. It seems that scan.kv.snapshot.lease-id is controlled by configuration,
> and the default value is UUID.randomUUID(). If the user doesn't set this
> value, the UUID.randomUUID() will be changed each time the job's restarted,
> then  lease-id cannot be deleted until lease-duration(default value is 1
> days).
> 
> 2. What happens for newly discovered buckets(for example, newly created
> partition tables ),  a new least-id will be created or just reuse the
> least-id? If the newly discovered split1 is releasing the  least-id, and
> the newly discovered split1 is renewing for this  least-id. How to control
> the request order?
> 3. This FIP wants to decrease the pressure of zookeeper.  But it seems that
> a lot of *lease nodes will be created if there are many flink jobs?*
> 
> *Best,*
> *Hongshun*
> 
> On Wed, Dec 24, 2025 at 4:25 PM Yunhong Zheng <[email protected]> wrote:
> 
> > Hi Jark,
> >
> > Thanks for the thoughtful feedback.  The API changes are reasonable to me,
> > and the FIP has already been updated.
> >
> > Best,
> > Yunhong
> >
> > On 2025/12/21 14:37:45 Jark Wu wrote:
> > > Thanks for the great proposal.
> > >
> > > I think the current terminology may cause confusion about the
> > > relationship between "snapshot consumer" and Kafka-style consumer
> > > groups, similar to the consumer framework question raised by Liebing.
> > >
> > > Paimon uses the term "consumer ID" because its semantics are indeed
> > > analogous to Kafka consumer groups. However, Fluss’s snapshot
> > > "consumer" is fundamentally different: it’s solely used to pin KV
> > > snapshot files and does not track consumption offsets at all.
> > >
> > > Since this mechanism is designed specifically to prevent deletion and
> > > is typically time-bound (i.e., the snapshot shouldn’t be retained
> > > indefinitely if the client crashes), we should avoid the term
> > > "Consumer", as it usually implies long-lived, streaming consumption,
> > > and to avoid conflicts with the future "consumer group" in Fluss.
> > >
> > > Therefore, I suggest naming it "KV Snapshot Lease".
> > >
> > > The client "leases" the snapshot for a defined period. Once the lease
> > > expires and isn’t renewed, the server may safely reclaim the snapshot.
> > > The term "lease" is widely used in distributed systems (e.g., HDFS,
> > > etcd) for exactly this purpose.
> > >
> > > We can then update the interfaces accordingly:
> > >
> > > ```
> > > CompletableFuture<AcquireKvSnapshotLeaseResult> acquireKvSnapshotLease(
> > > String leaseId,
> > > Map<TableBucket, Long> snapshotIds,
> > > long leaseDuration);
> > >
> > > CompletableFuture<Void> renewKvSnapshotLease(
> > > String leaseId,
> > > long leaseDuration);
> > >
> > > CompletableFuture<Void> releaseKvSnapshotLease(
> > > String leaseId,
> > > Set<TableBucket> bucketsToRelease);
> > >
> > > CompletableFuture<Void> dropKvSnapshotLease(String leaseId);
> > > ```
> > >
> > > What do you think?
> > >
> > > Other minor suggestions:
> > >
> > > (1) I think we may need to use a compacted binary format for the
> > > zknode instead of using JSON. I'm afraid it will easily be greater
> > > than 1MB if using JSON.
> > >
> > > (2) We should at least retain 2 snapshots by default and for best
> > > practice. This doesn't increase lots of remote storage size (only one
> > > incremental size) and double zk node overhead. However, this can
> > > greatly simplify the implementaion without dealing the
> > > `failedTableBucketSet`. We can still introduce the
> > > `failedTableBucketSet`, but use it for optimization in the future to
> > > reduce the numRetain to one.
> > >
> > > Best,
> > > Jark
> > >
> > > On Sun, 21 Dec 2025 at 11:27, Yunhong Zheng <[email protected]> wrote:
> > > >
> > > > Hi Liebing,
> > > >
> > > > Thanks for the thoughtful feedback. We indeed need to consider the
> > problem of partition expiration. In the short term, I personally think we
> > could choose to ignore partition expiration—preventing expired partitions
> > from triggering failover in read jobs or tiering jobs. There's no need to
> > introduce partition consumer logic, which would increase implementation
> > complexity. What do you think?
> > > >
> > > > Best regards,
> > > > Yunhong
> > > >
> > > > On 2025/12/18 07:01:10 Liebing Yu wrote:
> > > > > Hi Yunhong!
> > > > >
> > > > > Thanks for your great work. I have a few questions:
> > > > >
> > > > > 1. Regarding Partition Expiration and Data Consistency:
> > > > > For KV tables, snapshot expiration significantly impacts consumers.
> > > > > However, for both KV and Log tables, partition expiration is another
> > > > > critical factor. Imagine a Flink job reading a Fluss partition that
> > expires
> > > > > due to TTL; in this case, data already written to that partition
> > will not
> > > > > be fully consumed.
> > > > >
> > > > > This creates a more pronounced data consistency issue in "union read"
> > > > > scenarios (integrated lake-stream tables). Consider this scenario:
> > > > > A Flink job starts consuming a partition where some data has already
> > been
> > > > > synced to the lake by the tiering service. If the partition expires
> > in
> > > > > Fluss during the consumption process (even if its data has been fully
> > > > > synced to the lake), the Flink job will end up with incomplete
> > streaming
> > > > > data. However, a subsequent batch read of the same partition would
> > retrieve
> > > > > the full data from the lake. This leads to a stream-batch
> > inconsistency—the
> > > > > same partition yields different results depending on whether it's
> > read via
> > > > > the stream or the batch path.
> > > > >
> > > > > 2. Regarding the Consumer Framework:
> > > > > Beyond the KV consumer, the issues mentioned above suggest that we
> > may need
> > > > > a Partition Consumer in the future. In fact, it might be beneficial
> > to
> > > > > introduce a generic Consumer concept for any ordinary job, allowing
> > Fluss
> > > > > to monitor the consumption progress of various clients.
> > > > >
> > > > > My question is: Should we take these factors into account now and
> > establish
> > > > > a more comprehensive consumer framework? This would make it much
> > easier to
> > > > > integrate new consumer types in the future.
> > > > >
> > > > >
> > > > > Best regards,
> > > > > Liebing Yu
> > > > >
> > > > >
> > > > > On Tue, 16 Dec 2025 at 19:33, Yunhong Zheng <[email protected]>
> > wrote:
> > > > >
> > > > > > Hi Yang,
> > > > > >
> > > > > > Thanks for the thoughtful feedback. Your analysis has significantly
> > > > > > improved the stability of introducing the this function. I will
> > address
> > > > > > each problem individually:
> > > > > >
> > > > > > Re for question 1:
> > > > > >
> > > > > > Good catch!  I encountered this issue during testing as well. The
> > key
> > > > > > problem is the time gap between fetching the latest kv snapshot and
> > > > > > registering the kv snapshot consumer, which are not atomic
> > operations. This
> > > > > > was considered during the design phase, but to avoid altering the
> > semantics
> > > > > > of Admin#getLatestKvSnapshots, we chose to keep the register kv
> > snapshot
> > > > > > consumer as a separate API.
> > > > > >
> > > > > > The solution is to add a retry mechanism: the failedTableBucketSet
> > in the
> > > > > > RegisterKvSnapshotResult returned by registerKvSnapshotConsumer
> > must be
> > > > > > retried repeatedly (Get latest snapshot -> register kv snapshot
> > consumer)
> > > > > > until all buckets are successfully registered.
> > > > > >
> > > > > > ------
> > > > > > Re for question 2:
> > > > > >
> > > > > > It's indeed worth considering continuing to retain multiple
> > snapshots for
> > > > > > one table bucket to reduce the impact on stability. The
> > configuration
> > > > > > 'kv.snapshot.num-retained' will also be kept as is for now.
> > However, there
> > > > > > might still be a potential issue: excessive ZooKeeper metadata,
> > which can
> > > > > > be addressed as a separate FIP or issue.
> > > > > >
> > > > > > ------
> > > > > > Re for question 3:
> > > > > >
> > > > > > For the following scenarios, we can consider them as rare cases or
> > misuse:
> > > > > >
> > > > > > 1. A job continue failover and is restored after a day or longer.
> > > > > > 2. Consuming a snapshot takes an exceptionally long time.
> > > > > > 3. The same consumerId is used by multiple jobs to consume
> > multiple tables.
> > > > > >
> > > > > > These scenarios may lead to the snapshot being cleared before the
> > user
> > > > > > consumes it. For such cases, we can document them in the user
> > guide, and
> > > > > > users can also configure a larger expiration time to mitigate this
> > problem.
> > > > > >
> > > > > >
> > > > > > -----
> > > > > > Re for question 4:
> > > > > >
> > > > > > This question involves a trade-off between two strategies:
> > > > > >
> > > > > > 1. Expiration time based on the first registration: This
> > accelerates the
> > > > > > consumer's TTL, preventing snapshots from being retained for
> > extended
> > > > > > periods and avoiding metadata bloat.
> > > > > > 2. Expiration time based on the latest registration: This
> > minimizes the
> > > > > > risk of snapshots being cleaned up during consumption.
> > > > > >
> > > > > > I personally lean toward the first strategy, but the second is
> > also valid.
> > > > > >
> > > > > > Best regards,
> > > > > > Yunhong
> > > > > >
> > > > > > On 2025/12/16 08:19:54 Yang Wang wrote:
> > > > > > > Hi Yunhong,
> > > > > > >
> > > > > > > Thank you for driving this very useful initiative—it will
> > significantly
> > > > > > > improve the stability of data consumption and the overall user
> > > > > > experience.
> > > > > > > I generally agree with most aspects of the design, but I have a
> > few minor
> > > > > > > questions:
> > > > > > >
> > > > > > >    1.
> > > > > > >
> > > > > > >    When Flink enumerates the snapshots it needs to subscribe to
> > and
> > > > > > >    attempts to subscribe, what is the expected behavior if a
> > snapshot
> > > > > > gets
> > > > > > >    deleted just before the subscription request is sent? (This
> > scenario
> > > > > > might
> > > > > > >    occur in certain edge cases.)
> > > > > > >    2.
> > > > > > >
> > > > > > >    Retaining only one snapshot by default could be risky.
> > Currently,
> > > > > > >    neither the CoordinatorServer nor the TabletServer can fully
> > > > > > guarantee that
> > > > > > >    snapshots registered in ZooKeeper are always reliable and
> > valid
> > > > > > (although
> > > > > > >    this is the design intention, there might still be
> > undiscovered bugs).
> > > > > > >    3.
> > > > > > >
> > > > > > >    I didn’t see any API for updating subscriptions. If a Flink
> > job
> > > > > > >    experiences a prolonged failover for some reason, would the
> > unread
> > > > > > >    snapshots expire as a result?
> > > > > > >    4.
> > > > > > >
> > > > > > >    Related to the previous question: Is the TTL of a snapshot
> > calculated
> > > > > > >    from the time it was registered, or from the last time a
> > consumer
> > > > > > updated
> > > > > > >    its subscription?
> > > > > > >
> > > > > > > Best regards,
> > > > > > > Yang
> > > > > > >
> > > > > > > yunhong Zheng <[email protected]> 于2025年12月11日周四 22:40写道:
> > > > > > >
> > > > > > > > Hi all,
> > > > > > > >
> > > > > > > > Currently, for Fluss PrimaryKey Table, the number of kv
> > snapshots
> > > > > > retained
> > > > > > > > per bucket is controlled by the server
> > > > > > > > option `kv.snapshot.num-retained`  (default value: 1). If this
> > value
> > > > > > is set
> > > > > > > > too small, Kv snapshots that are being actively consumed may
> > be deleted
> > > > > > > > while a consumer is still consuming them.
> > > > > > > >
> > > > > > > > This case will cause a Flink job which read PrimaryKey table
> > fail and
> > > > > > > > cannot be restarted from its previous state.
> > > > > > > >
> > > > > > > > To avoid this case, the fluss server needs to be aware of which
> > > > > > consumers
> > > > > > > > are actively consuming the corresponding kv snapshots, and can
> > not
> > > > > > delete
> > > > > > > > these kv snapshots that are currently being consumed.
> > > > > > > >
> > > > > > > > So, I'd like to propose FIP-22: Support Kv Snapshot
> > Consumer[1].
> > > > > > > >
> > > > > > > > Any feedback are suggestions on this proposal are welcome!
> > > > > > > >
> > > > > > > > [1]:
> > > > > > > >
> > > > > > > >
> > > > > >
> > https://cwiki.apache.org/confluence/display/FLUSS/FIP-22+Support+Kv+Snapshot+Consumer
> > > > > > > >
> > > > > > > > Regards,
> > > > > > > > Yunhong
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > >
> >
> 

Reply via email to